Page MenuHomePhorge

No OneTemporary

Size
31 KB
Referenced Files
None
Subscribers
None
diff --git a/README.md b/README.md
index 661b67b..96e69c5 100644
--- a/README.md
+++ b/README.md
@@ -1,75 +1,75 @@
# Exile
-Exile is an alternative to beam [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix issues with ports.
+Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix ports issues.
+
+Exile is built around the idea of having demand-driven, asynchronous interaction with external process. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
-Exile is built around the idea of having demand-driven, asynchronous interaction with external command. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
``` elixir
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
`Exile.stream!` is a convenience wrapper around `Exile.Process`. If you want more control over stdin, stdout, and os process use `Exile.Process` directly.
**Note: Exile is experimental and it is still work-in-progress. Exile is based on NIF, please know the implications of it before using it**
## Rationale
Approaches, and issues
#### Port
Port is the default way of executing external commands. This is okay when you have control over the external program's implementation and the interaction is minimal. Port has several important issues.
* it can end up creating [zombie process](https://hexdocs.pm/elixir/Port.html#module-zombie-operating-system-processes)
* cannot selectively close stdin. This is required when the external programs act on EOF from stdin
* it sends command output as a message to the beam process. This does not put back pressure on the external program and leads exhausting VM memory
-#### Port based solutions
+#### Middleware based solutions
-There are many port based libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. These solve the first two issues associated with ports: zombie process and selectively closing STDIN. But not the third issue: having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
+Libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. solves the first two issues associated with ports - zombie process and selectively closing STDIN. But not the third issue - having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
* no back-pressure
* additional os process (middleware) for every execution of your program
* in few cases such as porcelain user has to install this external program explicitly
* might not be suitable when the program requires constant communication between beam process and external program
-Note that Unlike Exile, bugs in port based implementation does not bring down beam VM.
+On the plus side, unlike Exile, bugs in the implementation does not bring down whole beam VM.
#### [ExCmd](https://github.com/akash-akya/ex_cmd)
This is my other stab at solving back pressure on the external program issue. It implements a demand-driven protocol using [odu](https://github.com/akash-akya/odu) to solve this. Since ExCmd is also a port based solution, concerns previously mentioned applies to ExCmd too.
## Exile
-Exile does non-blocking, asynchronous io system calls using NIF. Since it makes non-blocking system calls, schedulers are never blocked indefinitely. It does this in asynchronous fashion using `enif_select` so its efficient.
+Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication instead does raw stdio using NIF. Uses `select()` based API for asynchronous IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Makes use of dirty-schedulers for IO.
-**Advantages over other approaches:**
+**Highlights**
-* solves all three issues associated with port
-* it does not use any middleware
+* Back pressure
+* it does not use any middleware program
* no additional os process. no performance/resource cost
* no need to install any external command
-* can run many external programs in parallel without adversely affecting schedulers
-* stream abstraction for interacting with the external program
-* should be portable across POSIX compliant operating systems (not tested)
+* tries to handle zombie process by attempting to cleanup external process. *But* as there is no middleware involved with exile so it is still possbile to endup with zombie process
+* stream abstraction
If you are running executing huge number of external programs **concurrently** (more than few hundred) you might have to increase open file descriptors limit (`ulimit -n`)
Non-blocking io can be used for other interesting things. Such as reading named pipe (FIFO) files. `Exile.stream!(~w(cat data.pipe))` does not block schedulers so you can open hundreds of fifo files unlike default `file` based io.
-##### TODO
-* add benchmarks results
+#### TODO
+* add benchmarks results
### 🚨 Obligatory NIF warning
-As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls, spawned external processes are still completely isolated at OS level and the port issues it tries to solve are critical.
+As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls. Also, spawned external processes are still completely isolated at OS level.
If all you want is to run a command with no communication, then just sticking with `System.cmd` is a better option.
### License
Copyright (c) 2020 Akash Hiremath.
Exile source code is released under Apache License 2.0. Check [LICENSE](LICENSE.md) for more information.
diff --git a/lib/exile.ex b/lib/exile.ex
index 7b1198f..c86ee36 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,57 +1,73 @@
defmodule Exile do
@moduledoc """
Exile is an alternative for beam ports with back-pressure and non-blocking IO
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
# we use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
@doc """
Runs the given command with arguments and return an Enumerable to read command output.
First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
- * `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
- 1. input as Enumerable:
- ```elixir
- # List
- Exile.stream!(~w(bc -q), input: ["1+1\n", "2*2\n"]) |> Enum.to_list()
-
- # Stream
- Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
- ```
- 2. input as collectable:
- If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
- ```elixir
- Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
- |> Enum.to_list()
- ```
- By defaults no input will be given to the command
- * `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
- * `chunk_size` - Size of each iodata chunk emitted by Enumerable stream. When set to `:unbuffered` the output is unbuffered and chunk size will be variable depending on the amount of data availble at that time. Defaults to 65535
+
+ * `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
+
+ * Enumerable:
+
+ ```
+ # List
+ Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
+ # Stream
+ Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
+ ```
+
+ * Collectable:
+
+ If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
+
+ ```
+ Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
+ |> Enum.to_list()
+ ```
+
+ By defaults no input will be given to the command
+
+ * `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
+
+ * `chunk_size` - Size of each iodata chunk emitted by Enumerable stream. When set to `:unbuffered` the output is unbuffered and chunk size will be variable depending on the amount of data availble at that time. Defaults to 65535
+
All other options are passed to `Exile.Process.start_link/3`
### Examples
- ``` elixir
+ ```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
"""
+ @type collectable_func() :: (Collectable.t() -> any())
+
+ @spec stream!(nonempty_list(String.t()),
+ input: Enum.t() | collectable_func(),
+ exit_timeout: timeout(),
+ chunk_size: pos_integer() | :unbuffered
+ ) :: ExCmd.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index f727a22..063014d 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,469 +1,508 @@
defmodule Exile.Process do
@moduledoc """
GenServer which wraps spawned external command.
- One should use `Exile.stream!` over `Exile.Process`. stream internally manages this server for you. Use this only if you need more control over the life-cycle OS process.
+ `Exile.stream!/1` should be preferred over this. Use this only if you need more control over the life-cycle of IO streams and OS process.
- ## Overview
- `Exile.Process` is an alternative primitive for Port. It has different interface and approach to running external programs to solve the issues associated with the ports.
+ ## Comparison with Port
- ### When compared to Port
- * it is demand driven. User explicitly has to `read` output of the command and the progress of the external command is controlled using OS pipes. so unlike Port, this never cause memory issues in beam by loading more than we can consume
- * it can close stdin of the program explicitly
- * does not create zombie process. It always tries to cleanup resources
+ * it is demand driven. User explicitly has to `read` the command output, and the progress of the external command is controlled using OS pipes. Exile never load more output than we can consume, so we should never experience memory issues
+ * it can close stdin while consuming stdout
+ * tries to handle zombie process by attempting to cleanup external process. Note that there is no middleware involved with exile so it is still possbile to endup with zombie process.
- At high level it makes non-blocking asynchronous system calls to execute and interact with the external program. It completely bypasses beam implementation for the same using NIF. It uses `select()` system call for asynchronous IO. Most of the system calls are non-blocking, so it does not has adverse effect on scheduler. Issues such as "scheduler collapse".
+ Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication, instead uses raw stdio and NIF. Uses `select()` based API for asynchronous IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Make use of dirty-schedulers for IO
"""
alias Exile.ProcessNif, as: Nif
require Logger
use GenServer
defmodule Error do
defexception [:message]
end
alias Exile.Process.Error
@default_opts [env: []]
@doc """
Starts `Exile.ProcessServer`
Starts external program using `cmd_with_args` with options `opts`
`cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
- * `cd` - the directory to run the command in
- * `env` - an enumerable of tuples containing environment key-value. These can be accessed in the external program
+ * `cd` - the directory to run the command in
+ * `env` - a list of tuples containing environment key-value. These can be accessed in the external program
"""
+ @type process :: pid
+ @spec start_link(nonempty_list(String.t()),
+ cd: String.t(),
+ env: [{String.t(), String.t()}]
+ ) :: {:ok, process} | {:error, any()}
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
with {:ok, args} <- normalize_args(cmd_with_args, opts) do
GenServer.start(__MODULE__, args)
end
end
+ @doc """
+ Closes external program's input stream
+ """
+ @spec close_stdin(process) :: :ok | {:error, any()}
def close_stdin(process) do
GenServer.call(process, :close_stdin, :infinity)
end
+ @doc """
+ Writes iodata `data` to program's input streams
+
+ This blocks when the pipe is full
+ """
+ @spec write(process, binary) :: :ok | {:error, any()}
def write(process, iodata) do
GenServer.call(process, {:write, IO.iodata_to_binary(iodata)}, :infinity)
end
+ @doc """
+ Return bytes written by the program to output stream.
+
+ This blocks until the programs write and flush the output depending on the `size`
+ """
+ @spec read(process, pos_integer()) ::
+ {:ok, iodata} | {:eof, iodata} | {:error, any()}
def read(process, size) when (is_integer(size) and size > 0) or size == :unbuffered do
GenServer.call(process, {:read, size}, :infinity)
end
def read(process) do
GenServer.call(process, {:read, :unbuffered}, :infinity)
end
+ @doc """
+ Sends signal to external program
+ """
+ @spec kill(process, :sigkill | :sigterm) :: :ok
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process, {:kill, signal}, :infinity)
end
+ @doc """
+ Waits for the program to terminate.
+
+ If the program terminates before timeout, it returns `{:ok, exit_status}` else returns `:timeout`
+ """
+ @spec await_exit(process, timeout: timeout()) :: {:ok, integer()} | :timeout
def await_exit(process, timeout \\ :infinity) do
GenServer.call(process, {:await_exit, timeout}, :infinity)
end
+ @doc """
+ Returns os pid of the command
+ """
+ @spec os_pid(process) :: pos_integer()
def os_pid(process) do
GenServer.call(process, :os_pid, :infinity)
end
+ @doc """
+ Stops the exile process, external program will be terminated in the background
+ """
+ @spec stop(process) :: :ok
def stop(process), do: GenServer.call(process, :stop, :infinity)
## Server
defmodule Pending do
+ @moduledoc false
defstruct bin: [], remaining: 0, client_pid: nil
end
defstruct [
:args,
:errno,
:port,
:socket_path,
:stdin,
:stdout,
:context,
:status,
await: %{},
pending_read: nil,
pending_write: nil
]
alias __MODULE__
def init(args) do
state = %__MODULE__{
args: args,
errno: nil,
status: :init,
await: %{},
pending_read: %Pending{},
pending_write: %Pending{}
}
{:ok, state, {:continue, nil}}
end
def handle_continue(nil, state) do
Elixir.Process.flag(:trap_exit, true)
%{cmd_with_args: cmd_with_args, cd: cd, env: env} = state.args
socket_path = socket_path()
uds = create_unix_domain_socket!(socket_path)
port = exec(cmd_with_args, socket_path, env, cd)
{:os_pid, os_pid} = Port.info(port, :os_pid)
Exile.Watcher.watch(self(), os_pid, socket_path)
{write_fd, read_fd} = receive_file_descriptors!(uds)
{:noreply,
%Process{
state
| port: port,
status: :start,
socket_path: socket_path,
stdin: read_fd,
stdout: write_fd
}}
end
def handle_call(:stop, _from, state) do
# TODO: pending write and read should receive "stopped" return
# value instead of exit signal
case state.status do
{:exit, _} -> :ok
_ -> Port.close(state.port)
end
{:stop, :normal, :ok, state}
end
def handle_call(:close_stdin, _from, state) do
case state.status do
{:exit, _} -> {:reply, :ok, state}
_ -> do_close(state, :stdin)
end
end
def handle_call({:await_exit, _}, _from, %{status: {:exit, status}} = state) do
{:reply, {:ok, {:exit, status}}, state}
end
def handle_call({:await_exit, timeout}, from, %{status: :start} = state) do
tref =
if timeout != :infinity do
Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
else
nil
end
{:noreply, %Process{state | await: Map.put(state.await, from, tref)}}
end
def handle_call(_, _from, %{status: {:exit, status}} = state) do
{:reply, {:error, {:exit, status}}, state}
end
def handle_call({:write, binary}, from, state) do
cond do
!is_binary(binary) ->
{:reply, {:error, :not_binary}, state}
state.pending_write.client_pid ->
{:reply, {:error, :pending_write}, state}
true ->
pending = %Pending{bin: binary, client_pid: from}
do_write(%Process{state | pending_write: pending})
end
end
def handle_call({:read, size}, from, state) do
if state.pending_read.client_pid do
{:reply, {:error, :pending_read}, state}
else
pending = %Pending{remaining: size, client_pid: from}
do_read(%Process{state | pending_read: pending})
end
end
def handle_call(:os_pid, _from, state) do
case Port.info(state.port, :os_pid) do
{:os_pid, os_pid} ->
{:reply, {:ok, os_pid}, state}
:undefined ->
Logger.debug("Process not alive")
{:reply, :undefined, state}
end
end
def handle_call({:kill, signal}, _from, state) do
{:reply, signal(state.port, signal), state}
end
def handle_info({:await_exit_timeout, from}, state) do
GenServer.reply(from, :timeout)
{:noreply, %Process{state | await: Map.delete(state.await, from)}}
end
def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
def handle_info({:select, _read_resource, _ref, :ready_input}, state), do: do_read(state)
def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state),
do: handle_port_exit(exit_status, state)
def handle_info({:EXIT, port, :normal}, %{port: port} = state), do: {:noreply, state}
def handle_info({:EXIT, _, reason}, state), do: {:stop, reason, state}
defp handle_port_exit(exit_status, state) do
Enum.each(state.await, fn {from, tref} ->
GenServer.reply(from, {:ok, {:exit, exit_status}})
if tref do
Elixir.Process.cancel_timer(tref)
end
end)
{:noreply, %Process{state | status: {:exit, exit_status}}, await: %{}}
end
defp do_write(%Process{pending_write: %Pending{bin: <<>>}} = state) do
GenServer.reply(state.pending_write.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
defp do_write(%Process{pending_write: pending} = state) do
case Nif.nif_write(state.stdin, pending.bin) do
{:ok, size} ->
if size < byte_size(pending.bin) do
binary = binary_part(pending.bin, size, byte_size(pending.bin) - size)
{:noreply, %{state | pending_write: %Pending{pending | bin: binary}}}
else
GenServer.reply(pending.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: %Pending{remaining: :unbuffered} = pending} = state) do
case Nif.nif_read(state.stdout, -1) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, []})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
GenServer.reply(pending.client_pid, {:ok, binary})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %Process{state | pending_read: %Pending{}, errno: errno}}
end
end
defp do_read(%Process{pending_read: pending} = state) do
case Nif.nif_read(state.stdout, pending.remaining) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, pending.bin})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
if byte_size(binary) < pending.remaining do
pending = %Pending{
pending
| bin: [pending.bin | binary],
remaining: pending.remaining - byte_size(binary)
}
{:noreply, %Process{state | pending_read: pending}}
else
GenServer.reply(pending.client_pid, {:ok, [state.pending_read.bin | binary]})
{:noreply, %Process{state | pending_read: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | pending_read: %Pending{}, errno: errno}}
end
end
defp do_close(state, type) do
fd =
if type == :stdin do
state.stdin
else
state.stdout
end
case Nif.nif_close(fd) do
:ok ->
{:reply, :ok, state}
{:error, errno} ->
# FIXME: correct
raise errno
{:reply, {:error, errno}, %Process{state | errno: errno}}
end
end
defp normalize_cmd(cmd) do
path = System.find_executable(cmd)
if path do
{:ok, to_charlist(path)}
else
{:error, "command not found: #{inspect(cmd)}"}
end
end
defp normalize_cmd(_cmd_with_args) do
{:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
end
defp normalize_cmd_args([_ | args]) do
if is_list(args) && Enum.all?(args, &is_binary/1) do
{:ok, Enum.map(args, &to_charlist/1)}
else
{:error, "command arguments must be list of strings. #{inspect(args)}"}
end
end
defp normalize_cd(nil), do: {:ok, ''}
defp normalize_cd(cd) do
if File.exists?(cd) && File.dir?(cd) do
{:ok, to_charlist(cd)}
else
{:error, "`:cd` must be valid directory path"}
end
end
defp normalize_env(nil), do: {:ok, []}
defp normalize_env(env) do
env =
Enum.map(env, fn {key, value} ->
{to_charlist(key), to_charlist(value)}
end)
{:ok, env}
end
defp validate_opts_fields(opts) do
{_, additional_opts} = Keyword.split(opts, [:cd, :env])
if Enum.empty?(additional_opts) do
:ok
else
{:error, "invalid opts: #{inspect(additional_opts)}"}
end
end
defp normalize_args(cmd_with_args, opts) do
with {:ok, cmd} <- normalize_cmd(cmd_with_args),
{:ok, args} <- normalize_cmd_args(cmd_with_args),
:ok <- validate_opts_fields(opts),
{:ok, cd} <- normalize_cd(opts[:cd]),
{:ok, env} <- normalize_env(opts[:env]) do
{:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env}}
end
end
defp normalize_args(_, _), do: {:error, "invalid arguments"}
@spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
defp exec(cmd_with_args, socket_path, env, cd) do
opts = []
opts = if cd, do: [{:cd, cd} | opts], else: []
opts = if env, do: [{:env, env} | opts], else: opts
opts =
[
:nouse_stdio,
:exit_status,
:binary,
args: [socket_path | cmd_with_args]
] ++ opts
Port.open({:spawn_executable, @spawner_path}, opts)
end
defp socket_path do
str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
path = Path.join(System.tmp_dir!(), str)
_ = :file.delete(path)
path
end
defp signal(port, sig) when sig in [:sigkill, :sigterm] do
case Port.info(port, :os_pid) do
{:os_pid, os_pid} -> Nif.nif_kill(os_pid, sig)
:undefined -> {:error, :process_not_alive}
end
end
defp create_unix_domain_socket!(socket_path) do
{:ok, uds} = :socket.open(:local, :stream, :default)
{:ok, _} = :socket.bind(uds, %{family: :local, path: socket_path})
:ok = :socket.listen(uds)
uds
end
@socket_timeout 2000
defp receive_file_descriptors!(uds) do
with {:ok, sock} <- :socket.accept(uds, @socket_timeout),
{:ok, msg} <- :socket.recvmsg(sock) do
%{
ctrl: [
%{
data: <<read_fd_int::native-32, write_fd_int::native-32, _rest::binary>>,
level: :socket,
type: :rights
}
]
} = msg
:socket.close(sock)
with {:ok, write_fd} <- Nif.nif_create_fd(write_fd_int),
{:ok, read_fd} <- Nif.nif_create_fd(read_fd_int) do
{write_fd, read_fd}
else
error ->
raise Error,
message: "Failed to create fd resources\n error: #{inspect(error)}"
end
else
error ->
raise Error,
message:
"Failed to receive stdin and stdout file descriptors\n error: #{inspect(error)}"
end
end
end
diff --git a/lib/exile/process_nif.ex b/lib/exile/process_nif.ex
index 2b67682..7607274 100644
--- a/lib/exile/process_nif.ex
+++ b/lib/exile/process_nif.ex
@@ -1,30 +1,21 @@
defmodule Exile.ProcessNif do
@moduledoc false
@on_load :load_nifs
def load_nifs do
nif_path = :filename.join(:code.priv_dir(:exile), "exile")
:erlang.load_nif(nif_path, 0)
end
def nif_is_os_pid_alive(_os_pid), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_kill(_os_pid, _signal), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_read(_fd, _request), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_create_fd(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_close(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_write(_fd, _bin), do: :erlang.nif_error(:nif_library_not_loaded)
-
- # non-nif helper functions
- defmacro fork_exec_failure(), do: 125
-
- defmacro nif_false(), do: 0
- defmacro nif_true(), do: 1
-
- def to_process_fd(:stdin), do: 0
- def to_process_fd(:stdout), do: 1
end
diff --git a/lib/exile/watcher.ex b/lib/exile/watcher.ex
index 070a7ea..acf7a3b 100644
--- a/lib/exile/watcher.ex
+++ b/lib/exile/watcher.ex
@@ -1,77 +1,79 @@
defmodule Exile.Watcher do
+ @moduledoc false
+
use GenServer, restart: :temporary
require Logger
alias Exile.ProcessNif, as: Nif
def start_link(args) do
{:ok, _pid} = GenServer.start_link(__MODULE__, args)
end
def watch(pid, os_pid, socket_path) do
spec = {Exile.Watcher, %{pid: pid, os_pid: os_pid, socket_path: socket_path}}
DynamicSupervisor.start_child(Exile.WatcherSupervisor, spec)
end
def init(args) do
%{pid: pid, os_pid: os_pid, socket_path: socket_path} = args
Process.flag(:trap_exit, true)
ref = Elixir.Process.monitor(pid)
{:ok, %{pid: pid, os_pid: os_pid, socket_path: socket_path, ref: ref}}
end
def handle_info({:DOWN, ref, :process, pid, _reason}, %{
pid: pid,
socket_path: socket_path,
os_pid: os_pid,
ref: ref
}) do
File.rm!(socket_path)
attempt_graceful_exit(os_pid)
{:stop, :normal, nil}
end
def handle_info({:EXIT, _, reason}, nil), do: {:stop, reason, nil}
# when watcher is attempted to be killed, we forcefully kill external os process.
# This can happen when beam receive SIGTERM
def handle_info({:EXIT, _, reason}, %{pid: pid, socket_path: socket_path, os_pid: os_pid}) do
Logger.debug(fn -> "Watcher exiting. reason: #{inspect(reason)}" end)
File.rm!(socket_path)
Exile.Process.stop(pid)
attempt_graceful_exit(os_pid)
{:stop, reason, nil}
end
defp attempt_graceful_exit(os_pid) do
try do
Logger.debug(fn -> "Stopping external program" end)
# at max we wait for 100ms for program to exit
process_exit?(os_pid, 100) && throw(:done)
Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
Nif.nif_kill(os_pid, :sigterm)
process_exit?(os_pid, 100) && throw(:done)
Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
Nif.nif_kill(os_pid, :sigkill)
process_exit?(os_pid, 1000) && throw(:done)
Logger.error("failed to kill external process")
raise "Failed to kill external process"
catch
:done -> Logger.debug(fn -> "External program exited successfully" end)
end
end
defp process_exit?(os_pid), do: !Nif.nif_is_os_pid_alive(os_pid)
defp process_exit?(os_pid, timeout) do
if process_exit?(os_pid) do
true
else
:timer.sleep(timeout)
process_exit?(os_pid)
end
end
end

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 8:33 AM (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
41004
Default Alt Text
(31 KB)

Event Timeline