Page MenuHomePhorge

No OneTemporary

Size
57 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/exile.ex b/lib/exile.ex
index 55191af..98310f7 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,140 +1,132 @@
defmodule Exile do
- @moduledoc """
+ @moduledoc ~S"""
Exile is an alternative for beam [ports](https://hexdocs.pm/elixir/Port.html)
with back-pressure and non-blocking IO.
- ## Comparison with Port
+ ### Quick Start
- * it is demand driven. User explicitly has to `read` the command
- output, and the progress of the external command is controlled
- using OS pipes. Exile never load more output than we can consume,
- so we should never experience memory issues
-
- * it can close stdin while consuming output
+ ```
- * tries to handle zombie process by attempting to cleanup
- external process. Note that there is no middleware involved
- with exile so it is still possible to endup with zombie process.
+ ```
- * selectively consume stdout and stderr
+ For more details about stream API, see `Exile.stream!`.
- Internally Exile uses non-blocking asynchronous system calls
- to interact with the external process. It does not use port's
- message based communication, instead uses raw stdio and NIF.
- Uses asynchronous system calls for IO. Most of the system
- calls are non-blocking, so it should not block the beam
- schedulers. Make use of dirty-schedulers for IO
+ For more details about inner working, please check `Exile.Process`
+ documentation.
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
# We use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
- @doc """
- Runs the command with arguments and return an Enumerable to read the output.
+ @doc ~S"""
+ Runs the command with arguments and return an the stdout as lazily
+ Enumerable stream, similar to [`Stream`](https://hexdocs.pm/elixir/Stream.html).
First parameter must be a list containing command with arguments.
- example: `["cat", "file.txt"]`.
+ Example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* Enumerable:
```
# List
Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
# Stream
Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65_536)) |> Enum.to_list()
```
* Collectable:
If the input in a function with arity 1, Exile will call that function
with a `Collectable` as the argument. The function must *push* input to this
collectable. Return value of the function is ignored.
```
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
- By defaults no input is sent to the command
+ By defaults no input is sent to the command.
* `exit_timeout` - Duration to wait for external program to exit after completion
- before raising an error. Defaults to `:infinity`
+ (when stream ends). Defaults to `:infinity`
* `max_chunk_size` - Maximum size of iodata chunk emitted by the stream.
Chunk size can be less than the `max_chunk_size` depending on the amount of
data available to be read. Defaults to `65_535`
* `enable_stderr` - When set to true, output stream will contain stderr data along
with stdout. Stream data will be of the form `{:stdout, iodata}` or `{:stderr, iodata}`
to differentiate different streams. Defaults to false. See example below
* `ignore_epipe` - when set to true, `EPIPE` error during the write will be ignored.
This can be used to match UNIX shell default behaviour. EPIPE is the error raised
when the reader finishes the reading and close output pipe before command completes.
Defaults to `false`.
Remaining options are passed to `Exile.Process.start_link/2`
### Examples
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
Stream with stderr
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1),
input: File.stream!("music_video.mkv", [], 65_535),
enable_stderr: true
)
|> Stream.transform(
fn ->
File.open!("music.mp3", [:write, :binary])
end,
fn elem, file ->
case elem do
{:stdout, data} ->
+ # write stdout data to a file
:ok = IO.binwrite(file, data)
{:stderr, msg} ->
+ # write stderr output to console
:ok = IO.write(msg)
end
{[], file}
end,
fn file ->
:ok = File.close(file)
end
)
|> Stream.run()
```
"""
@type collectable_func() :: (Collectable.t() -> any())
@spec stream!(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
max_chunk_size: pos_integer()
) :: Exile.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index 36e4163..4dfb8e1 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,564 +1,860 @@
defmodule Exile.Process do
- @moduledoc """
+ @moduledoc ~S"""
GenServer which wraps spawned external command.
- `Exile.stream!/1` should be preferred over using this. Use this only
- if you need more control over the life-cycle of IO streams and OS
- process.
+ Use `Exile.stream!/1` over using this. Use this only if you are
+ familiar with life-cycle and need more control of the IO streams
+ and OS process.
## Comparison with Port
* it is demand driven. User explicitly has to `read` the command
output, and the progress of the external command is controlled
using OS pipes. Exile never load more output than we can consume,
so we should never experience memory issues
* it can close stdin while consuming output
* tries to handle zombie process by attempting to cleanup
external process. Note that there is no middleware involved
with exile so it is still possible to endup with zombie process.
* selectively consume stdout and stderr
Internally Exile uses non-blocking asynchronous system calls
to interact with the external process. It does not use port's
message based communication, instead uses raw stdio and NIF.
Uses asynchronous system calls for IO. Most of the system
calls are non-blocking, so it should not block the beam
schedulers. Make use of dirty-schedulers for IO
+
+ ## Introduction
+
+ `Exile.Process` is a process based wrapper around the external
+ process. It is similar to `port` as an entity but the interface is
+ different. All communication with the external process must happen
+ via `Exile.Process` interface.
+
+ Exile process life-cycle tied to external process and owners. All
+ system resources such are open file-descriptors, external process
+ are cleaned up when the `Exile.Process` dies.
+
+ ### Owner
+
+ Each `Exile.Process` has an owner. And it will be the process which
+ created it (via `Exile.Process.start_link/2`). Process owner can not
+ be changed.
+
+ Owner process will be linked to the `Exile.Process`. So when the
+ exile process is dies abnormally the owner will be killed too or
+ visa-versa. Owner process should avoid trapping the exit signal, if
+ you want avoid the caller getting killed, create a separate process
+ as owner to run the command and monitor that process.
+
+ Only owner can get the exit status of the command, using
+ `Exile.Process.await_exit/2`. All exile processes **MUST** be
+ awaited. Exit status or reason is **ALWAYS** sent to the owner. It
+ is similar to [`Task`](https://hexdocs.pm/elixir/Task.html). If the
+ owner exit without `await_exit`, the exile process will be killed,
+ but if the owner continue without `await_exit` then the exile
+ process will linger around till the process exit.
+
+ ```
+ iex> alias Exile.Process
+ iex> {:ok, p} = Process.start_link(~w(echo hello))
+ iex> Process.read(p, 100)
+ {:ok, "hello\n"}
+ iex> Process.read(p, 100) # read till we get :eof
+ :eof
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ ### Pipe & Pipe Owner
+
+ Standard IO pipes/channels/streams of the external process such as
+ STDIN, STDOUT, STDERR are called as Pipes. User can either write or
+ read data from pipes.
+
+ Each pipe has an owner process and only that process can write or
+ read from the exile process. By default the process who created the
+ exile process is the owner of all the pipes. Pipe owner can be
+ changed using `Exile.Process.change_pipe_owner/3`.
+
+ Pipe owner is monitored and the pipes are closed automatically when
+ the pipe owner exit. Pipe Owner can close the pipe early using
+ `Exile.Process.close_stdin/1` etc.
+
+ `Exile.Process.await_exit/2` closes all of the caller owned pipes by
+ default.
+
+ ```
+ iex> {:ok, p} = Process.start_link(~w(cat))
+ iex> writer = Task.async(fn ->
+ ...> :ok = Process.change_pipe_owner(p, :stdin, self())
+ ...> Process.write(p, "Hello World")
+ ...> end)
+ iex> Task.await(writer)
+ :ok
+ iex> Process.read(p, 100)
+ {:ok, "Hello World"}
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ ### Pipe Operations
+
+ Pipe owner can read or write date to the owned pipe. `:stderr` by
+ default is disabled, data written to stderr will appear on the
+ console. You can enable reading stderr by passing `enable_stderr:
+ true` during process creation.
+
+ Special function `Exile.Process.read_any/2` can be used to read
+ from either stdout or stderr whichever has the data available.
+
+ All Pipe operations blocks the caller to have blocking as natural
+ back-pressure and to make the API simple. This is an important
+ feature of Exile, that is the ability to block caller when the stdio
+ buffer is full, exactly similar to how programs works on the shell
+ with pipes between then `cat larg-file | grep "foo"`. Internally it
+ does not block the Exile process or VM (which is typically the case
+ with NIF calls). Because of this user can make concurrent read,
+ write to different pipes from separate processes. Internally Exile
+ uses asynchronous IO APIs to avoid blocking of VM or VM process.
+
+ Reading from stderr
+
+ ```
+ # write "Hello" to stdout and "World" to stderr
+ iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n")
+ iex> {:ok, p} = Process.start_link(["sh", "-c", script], enable_stderr: true)
+ iex> Process.read(p, 100)
+ {:ok, "Hello\n"}
+ iex> Process.read_stderr(p, 100)
+ {:ok, "World\n"}
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ Reading using `read_any`
+
+ ```
+ # write "Hello" to stdout and "World" to stderr
+ iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n")
+ iex> {:ok, p} = Process.start_link(["sh", "-c", script], enable_stderr: true)
+ iex> Process.read_any(p)
+ {:ok, {:stdout, "Hello\n"}}
+ iex> Process.read_any(p)
+ {:ok, {:stderr, "World\n"}}
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ ### Process Termination
+
+ When owner does (normally or abnormally) the Exile process always
+ terminated irrespective of pipe status or process status. External
+ process get a chance to terminate gracefully, if that fail it will
+ be killed.
+
+ If owner calls `await_exit` then the owner owned pipes are closed
+ and we wait for external process to terminate, if the process
+ already terminated then call returns immediately with exit
+ status. Else command will be attempted to stop gracefully following
+ the exit sequence based on the timeout value (5s by default).
+
+ If owner calls `await_exit` with `timeout` as `:infinity` then
+ Exile does not attempt to forcefully stop the external command and
+ wait for command to exit on itself. The `await_exit` call can be blocked
+ indefinitely waiting for external process to terminate.
+
+ If external process exit on its own, exit status is collected and
+ Exile process will wait for owner to close pipes. Most commands exit
+ with pipes are closed, so just ensuring to close pipes when works is
+ done should be enough.
+
+ Example of process getting terminated by `SIGTERM` signal
+
+ ```
+ # sleep command does not watch for stdin or stdout, so closing the
+ # pipe does not terminate the sleep command.
+ iex> {:ok, p} = Process.start_link(~w(sleep 100000000)) # sleep indefinitely
+ iex> Process.await_exit(p, 100) # ensure `await_exit` finish within `100ms`. By default it waits for 5s
+ {:ok, 143} # 143 is the exit status when command exit due to SIGTERM
+ ```
+
+ ## Examples
+
+ Run a command without any input or output
+
+ ```
+ iex> {:ok, p} = Process.start_link(["sh", "-c", "exit 1"])
+ iex> Process.await_exit(p)
+ {:ok, 1}
+ ```
+
+ Single process reading and writing to the command
+
+ ```
+ # bc is a calculator, which reads from stdin and writes output to stdout
+ iex> {:ok, p} = Process.start_link(~w(bc))
+ iex> Process.write(p, "1 + 1\n") # there must be new-line to indicate the end of the input line
+ :ok
+ iex> Process.read(p)
+ {:ok, "2\n"}
+ iex> Process.write(p, "2 * 10 + 1\n")
+ :ok
+ iex> Process.read(p)
+ {:ok, "21\n"}
+ # We must close stdin to signal the `bc` command that we are done.
+ # since `await_exit` implicitly closes the pipes, in this case we don't have to
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ Running a command which flush the output on stdin close. This is not
+ supported by Erlang/Elixir ports.
+
+ ```
+ # `base64` command reads all input and writes encoded output when stdin is closed.
+ iex> {:ok, p} = Process.start_link(~w(base64))
+ iex> Process.write(p, "abcdef")
+ :ok
+ iex> Process.close_stdin(p) # we can selectively close stdin and read all output
+ :ok
+ iex> Process.read(p)
+ {:ok, "YWJjZGVm\n"}
+ iex> Process.read(p) # typically it is better to read till we receive :eof when we are not sure how big the output data size is
+ :eof
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ Read and write to pipes in separate processes
+
+ ```
+ iex> {:ok, p} = Process.start_link(~w(cat))
+ iex> writer = Task.async(fn ->
+ ...> :ok = Process.change_pipe_owner(p, :stdin, self())
+ ...> Process.write(p, "Hello World")
+ ...> # no need to close the pipe explicitly here. Pipe will be closed automatically when process exit
+ ...> end)
+ iex> reader = Task.async(fn ->
+ ...> :ok = Process.change_pipe_owner(p, :stdout, self())
+ ...> Process.read(p)
+ ...> end)
+ iex> :timer.sleep(500) # wait for the reader and writer to change pipe owner, otherwise `await_exit` will close the pipes before we change pipe owner
+ iex> Process.await_exit(p, :infinity) # let the reader and writer take indefinite time to finish
+ {:ok, 0}
+ iex> Task.await(writer)
+ :ok
+ iex> Task.await(reader)
+ {:ok, "Hello World"}
+ ```
+
"""
use GenServer
alias Exile.Process.Exec
alias Exile.Process.Nif
alias Exile.Process.Operations
alias Exile.Process.Pipe
alias Exile.Process.State
require Logger
defmodule Error do
defexception [:message]
end
@type pipe_name :: :stdin | :stdout | :stderr
@type t :: %__MODULE__{
monitor_ref: reference(),
exit_ref: reference(),
pid: pid | nil,
owner: pid
}
defstruct [:monitor_ref, :exit_ref, :pid, :owner]
@type exit_status :: non_neg_integer
@default_opts [env: [], enable_stderr: false]
@default_buffer_size 65_535
@os_signal_timeout 1000
@doc """
- Starts `Exile.Process`
+ Starts `Exile.Process` server.
Starts external program using `cmd_with_args` with options `opts`
- `cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
+ `cmd_with_args` must be a list containing command with arguments.
+ example: `["cat", "file.txt"]`.
### Options
+
* `cd` - the directory to run the command in
- * `env` - a list of tuples containing environment key-value. These can be accessed in the external program
- * `enable_stderr` - when set to true, Exile connects stderr pipe for the consumption. Defaults to false. Note that when set to true stderr must be consumed to avoid external program from blocking
+
+ * `env` - a list of tuples containing environment key-value.
+ These can be accessed in the external program
+
+ * `enable_stderr` - when set to true, Exile connects stderr
+ pipe for the consumption. Defaults to false. Note that when set
+ to true stderr must be consumed to avoid external program from blocking.
+
+ Caller of the process will be the owner owner of the Exile Process.
+ And default owner of all opened pipes.
+
+ Please check module documentation for more details
"""
@spec start_link(nonempty_list(String.t()),
cd: String.t(),
env: [{String.t(), String.t()}],
enable_stderr: boolean()
) :: {:ok, t} | {:error, any()}
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
case Exec.normalize_exec_args(cmd_with_args, opts) do
{:ok, args} ->
owner = self()
exit_ref = make_ref()
args = Map.merge(args, %{owner: owner, exit_ref: exit_ref})
{:ok, pid} = GenServer.start_link(__MODULE__, args)
ref = Process.monitor(pid)
process = %__MODULE__{
pid: pid,
monitor_ref: ref,
exit_ref: exit_ref,
owner: owner
}
{:ok, process}
error ->
error
end
end
@doc """
- Closes external program's standard input pipe (stdin)
+ Closes external program's standard input pipe (stdin).
+
+ Only owner of the pipe can close the pipe. This call will return
+ immediately.
"""
- @spec close_stdin(t) :: :ok | {:error, any()}
+ @spec close_stdin(t) :: :ok | {:error, :pipe_closed_or_invalid_caller} | {:error, any()}
def close_stdin(process) do
GenServer.call(process.pid, {:close_pipe, :stdin}, :infinity)
end
@doc """
Closes external program's standard output pipe (stdout)
+
+ Only owner of the pipe can close the pipe. This call will return
+ immediately.
"""
@spec close_stdout(t) :: :ok | {:error, any()}
def close_stdout(process) do
GenServer.call(process.pid, {:close_pipe, :stdout}, :infinity)
end
@doc """
Closes external program's standard error pipe (stderr)
+
+ Only owner of the pipe can close the pipe. This call will return
+ immediately.
"""
@spec close_stderr(t) :: :ok | {:error, any()}
def close_stderr(process) do
GenServer.call(process.pid, {:close_pipe, :stderr}, :infinity)
end
@doc """
- Writes iodata `data` to program's standard input pipe
+ Writes iodata `data` to external program's standard input pipe.
- This blocks when the pipe is full
+ This call blocks when the pipe is full. Returns `:ok` when
+ the complete data is written.
"""
@spec write(t, binary) :: :ok | {:error, any()}
def write(process, iodata) do
binary = IO.iodata_to_binary(iodata)
GenServer.call(process.pid, {:write_stdin, binary}, :infinity)
end
@doc """
Returns bytes from executed command's stdout with maximum size `max_size`.
- Blocks if no bytes are written to stdout yet. And returns as soon as bytes are available
+ Blocks if no data present in stdout pipe yet. And returns as soon as
+ data of any size is available.
+
+ Note that `max_size` is the maximum size of the returned data. But
+ the returned data can be less than that depending on how the program
+ flush the data etc.
"""
@spec read(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
def read(process, max_size \\ @default_buffer_size)
when is_integer(max_size) and max_size > 0 do
GenServer.call(process.pid, {:read_stdout, max_size}, :infinity)
end
@doc """
Returns bytes from executed command's stderr with maximum size `max_size`.
+ Pipe must be enabled with `enable_stderr: true` to read the data.
+
+ Blocks if no bytes are written to stderr yet. And returns as soon as
+ bytes are available
- Blocks if no bytes are written to stderr yet. And returns as soon as bytes are available
+ Note that `max_size` is the maximum size of the returned data. But
+ the returned data can be less than that depending on how the program
+ flush the data etc.
"""
@spec read_stderr(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
def read_stderr(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
GenServer.call(process.pid, {:read_stderr, size}, :infinity)
end
@doc """
- Returns bytes from either stdout or stderr with maximum size `max_size` whichever is available.
+ Returns bytes from either stdout or stderr with maximum size
+ `max_size` whichever is available at that time.
+
+ Blocks if no bytes are written to stdout or stderr yet. And returns
+ as soon as data is available.
- Blocks if no bytes are written to stdout/stderr yet. And returns as soon as bytes are available
+ Note that `max_size` is the maximum size of the returned data. But
+ the returned data can be less than that depending on how the program
+ flush the data etc.
"""
@spec read_any(t, pos_integer()) ::
{:ok, {:stdout, iodata}} | {:ok, {:stderr, iodata}} | :eof | {:error, any()}
def read_any(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
GenServer.call(process.pid, {:read_stdout_or_stderr, size}, :infinity)
end
+ @doc """
+ Changes the Pipe owner of the pipe to specified pid.
+
+ Note that currently any process can change the pipe owner.
+
+ For more details about Pipe Owner, please check module docs.
+ """
@spec change_pipe_owner(t, pipe_name, pid) :: :ok | {:error, any()}
def change_pipe_owner(process, pipe_name, target_owner_pid) do
GenServer.call(
process.pid,
{:change_pipe_owner, pipe_name, target_owner_pid},
:infinity
)
end
@doc """
- Sends signal to external program
+ Sends an system signal to external program
+
+ Note that `:sigkill` kills the program unconditionally.
+
+ Avoid sending signals manually, use `await_exit` instead.
"""
@spec kill(t, :sigkill | :sigterm) :: :ok
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process.pid, {:kill, signal}, :infinity)
end
@doc """
- Waits for the program to terminate.
+ Wait for the program to terminate and get exit status.
- If the program terminates before timeout, it returns `{:ok, exit_status}`
+ **ONLY** the Process owner can call this function. And all Exile
+ **process MUST** be awaited (Similar to Task).
+
+ Exile first politely asks the program to terminate by closing the
+ pipes owned by the process owner (by default process owner is the
+ pipes owner). Most programs terminates when standard pipes are
+ closed.
+
+ If you have changed the pipe owner to other process, you have to
+ close pipe yourself or wait for the program to exit.
+
+ If the program fails to terminate within the timeout (default 5s)
+ then the program will be killed using the exit sequence by sending
+ `SIGTERM`, `SIGKILL` signals in sequence.
+
+ When timeout is set to `:infinity` `await_exit` wait for the
+ programs to terminate indefinitely.
+
+ For more details check module documentation.
"""
@spec await_exit(t, timeout :: timeout()) :: {:ok, exit_status}
def await_exit(process, timeout \\ 5000) do
%__MODULE__{
monitor_ref: monitor_ref,
exit_ref: exit_ref,
owner: owner,
pid: pid
} = process
if self() != owner do
raise ArgumentError,
"task #{inspect(process)} exit status can only be queried by owner but was queried from #{inspect(self())}"
end
graceful_exit_timeout =
if timeout == :infinity do
:infinity
else
# process exit steps should finish before receive timeout exceeds
# receive timeout is max allowed time for the `await_exit` call to block
max(0, timeout - 100)
end
:ok = GenServer.cast(pid, {:prepare_exit, owner, graceful_exit_timeout})
receive do
{^exit_ref, exit_status} ->
exit_status
{:DOWN, ^monitor_ref, _, _proc, reason} ->
exit({reason, {__MODULE__, :await_exit, [process, timeout]}})
after
# ideally we should never this this case since the process must
# be terminated before the timeout and we should have received
# `DOWN` message
timeout ->
exit({:timeout, {__MODULE__, :await_exit, [process, timeout]}})
end
end
@doc """
Returns OS pid of the command
+
+ This is meant only for debugging. Avoid interacting with the
+ external process directly
"""
@spec os_pid(t) :: pos_integer()
def os_pid(process) do
GenServer.call(process.pid, :os_pid, :infinity)
end
## Server
@impl true
def init(args) do
{enable_stderr, args} = Map.pop(args, :enable_stderr)
{owner, args} = Map.pop!(args, :owner)
{exit_ref, args} = Map.pop!(args, :exit_ref)
state = %State{
args: args,
owner: owner,
status: :init,
enable_stderr: enable_stderr,
operations: Operations.new(),
exit_ref: exit_ref,
monitor_ref: Process.monitor(owner)
}
{:ok, state, {:continue, nil}}
end
@impl true
def handle_continue(nil, state) do
{:noreply, exec(state)}
end
@impl true
def handle_cast({:prepare_exit, caller, timeout}, state) do
state =
Enum.reduce(state.pipes, state, fn {_pipe_name, pipe}, state ->
case Pipe.close(pipe, caller) do
{:ok, pipe} ->
{:ok, state} = State.put_pipe(state, pipe.name, pipe)
state
{:error, _} ->
state
end
end)
case maybe_shutdown(state) do
{:stop, :normal, state} ->
{:stop, :normal, state}
{:noreply, state} ->
if timeout == :infinity do
{:noreply, state}
else
total_stages = 3
stage_timeout = div(timeout, total_stages)
handle_info({:prepare_exit, :normal_exit, stage_timeout}, state)
end
end
end
@impl true
def handle_call({:change_pipe_owner, pipe_name, new_owner}, _from, state) do
with {:ok, pipe} <- State.pipe(state, pipe_name),
{:ok, new_pipe} <- Pipe.set_owner(pipe, new_owner),
{:ok, state} <- State.put_pipe(state, pipe_name, new_pipe) do
{:reply, :ok, state}
else
{:error, _} = error ->
{:reply, error, state}
end
end
def handle_call({:close_pipe, pipe_name}, {caller, _} = from, state) do
with {:ok, pipe} <- State.pipe(state, pipe_name),
{:ok, new_pipe} <- Pipe.close(pipe, caller),
:ok <- GenServer.reply(from, :ok),
{:ok, new_state} <- State.put_pipe(state, pipe_name, new_pipe) do
maybe_shutdown(new_state)
else
{:error, _} = ret ->
{:reply, ret, state}
end
end
def handle_call({:read_stdout, size}, from, state) do
case Operations.read(state, {:read_stdout, from, size}) do
{:noreply, state} ->
{:noreply, state}
ret ->
{:reply, ret, state}
end
end
def handle_call({:read_stderr, size}, from, state) do
case Operations.read(state, {:read_stderr, from, size}) do
{:noreply, state} ->
{:noreply, state}
ret ->
{:reply, ret, state}
end
end
def handle_call({:read_stdout_or_stderr, size}, from, state) do
case Operations.read_any(state, {:read_stdout_or_stderr, from, size}) do
{:noreply, state} ->
{:noreply, state}
ret ->
{:reply, ret, state}
end
end
def handle_call({:write_stdin, binary}, from, state) do
case Operations.write(state, {:write_stdin, from, binary}) do
{:noreply, state} ->
{:noreply, state}
ret ->
{:reply, ret, state}
end
end
def handle_call(:os_pid, _from, state) do
case Port.info(state.port, :os_pid) do
{:os_pid, os_pid} ->
{:reply, {:ok, os_pid}, state}
nil ->
Logger.debug("Process not alive")
{:reply, :undefined, state}
end
end
def handle_call({:kill, signal}, _from, state) do
{:reply, signal(state.port, signal), state}
end
@impl true
def handle_info({:prepare_exit, current_stage, timeout}, %{status: status, port: port} = state) do
cond do
status != :running ->
{:noreply, state}
current_stage == :normal_exit ->
Elixir.Process.send_after(self(), {:prepare_exit, :sigterm, timeout}, timeout)
{:noreply, state}
current_stage == :sigterm ->
signal(port, :sigterm)
Elixir.Process.send_after(self(), {:prepare_exit, :sigkill, timeout}, timeout)
{:noreply, state}
current_stage == :sigkill ->
signal(port, :sigkill)
Elixir.Process.send_after(self(), {:prepare_exit, :stop, timeout}, timeout)
{:noreply, state}
# this should never happen, since sigkill signal can not be ignored by the OS process
current_stage == :stop ->
{:stop, :sigkill_timeout, state}
end
end
def handle_info({:select, write_resource, _ref, :ready_output}, state) do
:stdin = State.pipe_name_for_fd(state, write_resource)
with {:ok, {:write_stdin, from, _bin} = operation, state} <-
State.pop_operation(state, :write_stdin) do
case Operations.write(state, operation) do
{:noreply, state} ->
{:noreply, state}
ret ->
GenServer.reply(from, ret)
{:noreply, state}
end
end
end
def handle_info({:select, read_resource, _ref, :ready_input}, state) do
pipe_name = State.pipe_name_for_fd(state, read_resource)
with {:ok, operation_name} <- Operations.match_pending_operation(state, pipe_name),
{:ok, {_, from, _} = operation, state} <- State.pop_operation(state, operation_name) do
ret =
case operation_name do
:read_stdout_or_stderr ->
Operations.read_any(state, operation)
name when name in [:read_stdout, :read_stderr] ->
Operations.read(state, operation)
end
case ret do
{:noreply, state} ->
{:noreply, state}
ret ->
GenServer.reply(from, ret)
{:noreply, state}
end
else
{:error, _error} ->
{:noreply, state}
end
end
def handle_info({:stop, :sigterm}, state) do
if state.status == :running do
signal(state.port, :sigkill)
Elixir.Process.send_after(self(), {:stop, :sigkill}, @os_signal_timeout)
end
{:noreply, state}
end
def handle_info({:stop, :sigkill}, state) do
if state.status == :running do
# this should never happen, since sigkill signal can not be handled
{:stop, :sigkill_timeout, state}
else
{:noreply, state}
end
end
def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state) do
send(state.owner, {state.exit_ref, {:ok, exit_status}})
state = State.set_status(state, {:exit, exit_status})
maybe_shutdown(state)
end
# we are only interested in Port exit signals
def handle_info({:EXIT, port, reason}, %State{port: port} = state) when reason != :normal do
send(state.owner, {state.exit_ref, {:error, reason}})
state = State.set_status(state, {:exit, {:error, reason}})
maybe_shutdown(state)
end
def handle_info({:EXIT, port, :normal}, %{port: port} = state) do
maybe_shutdown(state)
end
# shutdown unconditionally when process owner exit normally.
# Since Exile process is linked to the owner, in case of owner crash,
# exile process will be killed by the VM.
def handle_info(
{:DOWN, owner_ref, :process, _pid, reason},
%State{monitor_ref: owner_ref} = state
) do
{:stop, reason, state}
end
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
state =
Enum.reduce(state.pipes, state, fn {_pipe_name, pipe}, state ->
case Pipe.close(pipe, pid) do
{:ok, pipe} ->
{:ok, state} = State.put_pipe(state, pipe.name, pipe)
state
{:error, _} ->
state
end
end)
maybe_shutdown(state)
end
@type signal :: :sigkill | :sigterm
@spec signal(port, signal) :: :ok | {:error, :invalid_signal} | {:error, :process_not_alive}
defp signal(port, signal) do
with true <- signal in [:sigkill, :sigterm],
{:os_pid, os_pid} <- Port.info(port, :os_pid) do
Nif.nif_kill(os_pid, signal)
else
false ->
{:error, :invalid_signal}
nil ->
{:error, :process_not_alive}
end
end
@spec maybe_shutdown(State.t()) :: {:stop, :normal, State.t()} | {:noreply, State.t()}
defp maybe_shutdown(state) do
open_pipes_count =
state.pipes
|> Map.values()
|> Enum.count(&Pipe.open?/1)
if open_pipes_count == 0 && !(state.status in [:init, :running]) do
{:stop, :normal, state}
else
{:noreply, state}
end
end
@spec exec(State.t()) :: State.t()
defp exec(state) do
%{
port: port,
stdin: stdin_fd,
stdout: stdout_fd,
stderr: stderr_fd
} = Exec.start(state.args, state.enable_stderr)
stderr =
if state.enable_stderr do
Pipe.new(:stderr, stderr_fd, state.owner)
else
Pipe.new(:stderr)
end
%State{
state
| port: port,
status: :running,
pipes: %{
stdin: Pipe.new(:stdin, stdin_fd, state.owner),
stdout: Pipe.new(:stdout, stdout_fd, state.owner),
stderr: stderr
}
}
end
end
diff --git a/lib/exile/process/exec.ex b/lib/exile/process/exec.ex
index ea6e104..d22334c 100644
--- a/lib/exile/process/exec.ex
+++ b/lib/exile/process/exec.ex
@@ -1,195 +1,211 @@
defmodule Exile.Process.Exec do
@moduledoc false
alias Exile.Process.Nif
+ alias Exile.Process.Pipe
@type args :: %{
cmd_with_args: [String.t()],
cd: String.t(),
env: [{String.t(), String.t()}]
}
@spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
@spec start(args, boolean()) :: %{
port: port,
stdin: non_neg_integer(),
stdout: non_neg_integer(),
stderr: non_neg_integer()
}
def start(
%{
cmd_with_args: cmd_with_args,
cd: cd,
env: env
},
enable_stderr
) do
socket_path = socket_path()
{:ok, sock} = :socket.open(:local, :stream, :default)
try do
:ok = socket_bind(sock, socket_path)
:ok = :socket.listen(sock)
spawner_cmdline_args = [socket_path, to_string(enable_stderr) | cmd_with_args]
port_opts =
[:nouse_stdio, :exit_status, :binary, args: spawner_cmdline_args] ++
prune_nils(env: env, cd: cd)
port = Port.open({:spawn_executable, @spawner_path}, port_opts)
{:os_pid, os_pid} = Port.info(port, :os_pid)
Exile.Watcher.watch(self(), os_pid, socket_path)
{stdin_fd, stdout_fd, stderr_fd} = receive_fds(sock, enable_stderr)
%{port: port, stdin: stdin_fd, stdout: stdout_fd, stderr: stderr_fd}
after
:socket.close(sock)
File.rm!(socket_path)
end
end
+ @spec normalize_exec_args(nonempty_list(), keyword()) ::
+ {:ok, %{cmd_with_args: nonempty_list(), cd: charlist, env: env, enable_stderr: boolean}}
+ | {:error, String.t()}
def normalize_exec_args(cmd_with_args, opts) do
with {:ok, cmd} <- normalize_cmd(cmd_with_args),
{:ok, args} <- normalize_cmd_args(cmd_with_args),
:ok <- validate_opts_fields(opts),
{:ok, cd} <- normalize_cd(opts[:cd]),
{:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
{:ok, env} <- normalize_env(opts[:env]) do
{:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, enable_stderr: enable_stderr}}
end
end
@socket_timeout 2000
+ @spec receive_fds(:socket.socket(), boolean) :: {Pipe.fd(), Pipe.fd(), Pipe.fd()}
defp receive_fds(lsock, enable_stderr) do
{:ok, sock} = :socket.accept(lsock, @socket_timeout)
try do
{:ok, msg} = :socket.recvmsg(sock, @socket_timeout)
%{ctrl: [%{data: data, level: :socket, type: :rights}]} = msg
<<stdin_fd::native-32, stdout_fd::native-32, stderr_fd::native-32, _::binary>> = data
# FDs are managed by the NIF resource life-cycle
{:ok, stdout} = Nif.nif_create_fd(stdout_fd)
{:ok, stdin} = Nif.nif_create_fd(stdin_fd)
{:ok, stderr} =
if enable_stderr do
Nif.nif_create_fd(stderr_fd)
else
{:ok, nil}
end
{stdin, stdout, stderr}
after
:socket.close(sock)
end
end
+ # skip type warning till we change min OTP version to 24.
+ @dialyzer {:nowarn_function, socket_bind: 2}
defp socket_bind(sock, path) do
case :socket.bind(sock, %{family: :local, path: path}) do
:ok -> :ok
# for compatibility with OTP version < 24
{:ok, _} -> :ok
other -> other
end
end
+ @spec socket_path() :: String.t()
defp socket_path do
str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
path = Path.join(System.tmp_dir!(), str)
_ = :file.delete(path)
path
end
+ @spec prune_nils(keyword()) :: keyword()
defp prune_nils(kv) do
Enum.reject(kv, fn {_, v} -> is_nil(v) end)
end
+ @spec normalize_cmd(nonempty_list()) :: {:ok, nonempty_list()} | {:error, binary()}
defp normalize_cmd(arg) do
case arg do
[cmd | _] when is_binary(cmd) ->
path = System.find_executable(cmd)
if path do
{:ok, to_charlist(path)}
else
{:error, "command not found: #{inspect(cmd)}"}
end
_ ->
{:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
end
end
defp normalize_cmd_args([_ | args]) do
- if is_list(args) && Enum.all?(args, &is_binary/1) do
+ if Enum.all?(args, &is_binary/1) do
{:ok, Enum.map(args, &to_charlist/1)}
else
{:error, "command arguments must be list of strings. #{inspect(args)}"}
end
end
+ @spec normalize_cd(binary) :: {:ok, charlist()} | {:error, String.t()}
defp normalize_cd(cd) do
case cd do
nil ->
{:ok, ''}
cd when is_binary(cd) ->
if File.exists?(cd) && File.dir?(cd) do
{:ok, to_charlist(cd)}
else
{:error, "`:cd` must be valid directory path"}
end
_ ->
{:error, "`:cd` must be a binary string"}
end
end
+ @type env :: list({String.t(), String.t()})
+
+ @spec normalize_env(env) :: {:ok, env} | {:error, String.t()}
defp normalize_env(env) do
case env do
nil ->
{:ok, []}
env when is_list(env) or is_map(env) ->
env =
Enum.map(env, fn {key, value} ->
{to_charlist(key), to_charlist(value)}
end)
{:ok, env}
_ ->
{:error, "`:env` must be a map or list of `{string, string}`"}
end
end
+ @spec normalize_enable_stderr(enable_stderr :: boolean) :: {:ok, boolean} | {:error, String.t()}
defp normalize_enable_stderr(enable_stderr) do
case enable_stderr do
nil ->
{:ok, false}
enable_stderr when is_boolean(enable_stderr) ->
{:ok, enable_stderr}
_ ->
{:error, ":enable_stderr must be a boolean"}
end
end
+ @spec validate_opts_fields(keyword) :: :ok | {:error, String.t()}
defp validate_opts_fields(opts) do
{_, additional_opts} = Keyword.split(opts, [:cd, :env, :enable_stderr])
if Enum.empty?(additional_opts) do
:ok
else
{:error, "invalid opts: #{inspect(additional_opts)}"}
end
end
end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index 41bc27d..1fc4ba1 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,634 +1,636 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
alias Exile.Process
alias Exile.Process.{Pipe, State}
+ doctest Exile.Process
+
describe "pipes" do
test "reading from stdout" do
{:ok, s} = Process.start_link(~w(echo test))
:timer.sleep(100)
assert {:ok, iodata} = Process.read(s, 100)
assert :eof = Process.read(s, 100)
assert IO.iodata_to_binary(iodata) == "test\n"
assert :ok == Process.close_stdin(s)
assert :ok == Process.close_stdout(s)
assert {:ok, 0} == Process.await_exit(s, 500)
refute Elixir.Process.alive?(s.pid)
end
test "write to stdin" do
{:ok, s} = Process.start_link(~w(cat))
assert :ok == Process.write(s, "hello")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "hello"
assert :ok == Process.write(s, "world")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "world"
assert :ok == Process.close_stdin(s)
assert :eof == Process.read(s)
assert {:ok, 0} == Process.await_exit(s, 100)
:timer.sleep(100)
refute Elixir.Process.alive?(s.pid)
end
test "when stdin is closed" do
logger = start_events_collector()
# base64 produces output only after getting EOF from stdin. we
# collect events in order and assert that we can still read from
# stdout even after closing stdin
{:ok, s} = Process.start_link(~w(base64))
# parallel reader should be blocked till we close stdin
start_parallel_reader(s, logger)
:timer.sleep(100)
assert :ok == Process.write(s, "hello")
add_event(logger, {:write, "hello"})
assert :ok == Process.write(s, "world")
add_event(logger, {:write, "world"})
:timer.sleep(100)
assert :ok == Process.close_stdin(s)
add_event(logger, :input_close)
assert {:ok, 0} == Process.await_exit(s)
# Process.stop(s)
# wait for the reader to read
Elixir.Process.sleep(500)
assert [
{:write, "hello"},
{:write, "world"},
:input_close,
{:read, "aGVsbG93b3JsZA==\n"},
:eof
] == get_events(logger)
end
test "reading from stderr" do
{:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: true)
assert {:ok, "foo\n"} = Process.read_stderr(s, 100)
end
test "reading from stdout or stderr using read_any" do
script = """
echo "foo"
echo "bar" >&2
"""
{:ok, s} = Process.start_link(["sh", "-c", script], enable_stderr: true)
{:ok, ret1} = Process.read_any(s, 100)
{:ok, ret2} = Process.read_any(s, 100)
assert {:stderr, "bar\n"} in [ret1, ret2]
assert {:stdout, "foo\n"} in [ret1, ret2]
assert :eof = Process.read_any(s, 100)
end
test "reading from stderr_read when stderr disabled" do
{:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: false)
assert {:error, :pipe_closed_or_invalid_caller} = Process.read_stderr(s, 100)
end
test "read_any with stderr disabled" do
script = """
echo "foo"
echo "bar" >&2
"""
{:ok, s} = Process.start_link(["sh", "-c", script], enable_stderr: false)
{:ok, ret} = Process.read_any(s, 100)
# we can still read from stdout even if stderr is disabled
assert ret == {:stdout, "foo\n"}
assert :eof = Process.read_any(s, 100)
end
test "if pipe gets closed on pipe owner exit normally" do
{:ok, s} = Process.start_link(~w(sleep 10000))
writer =
Task.async(fn ->
Process.change_pipe_owner(s, :stdin, self())
end)
# stdin must be closed on task completion
:ok = Task.await(writer)
assert %State{
pipes: %{
stdin: %Pipe{
name: :stdin,
fd: _,
monitor_ref: nil,
owner: nil,
status: :closed
},
# ensure other pipes are unaffected
stdout: %Pipe{
name: :stdout,
status: :open
}
}
} = :sys.get_state(s.pid)
end
test "if pipe gets closed on pipe owner is killed" do
{:ok, s} = Process.start_link(~w(sleep 10000))
writer =
spawn(fn ->
Process.change_pipe_owner(s, :stdin, self())
receive do
:block -> :ok
end
end)
# wait for pipe owner to change
:timer.sleep(100)
# stdin must be closed on process kill
true = Elixir.Process.exit(writer, :kill)
:timer.sleep(1000)
assert %State{
pipes: %{
stdin: %Pipe{
name: :stdin,
fd: _,
monitor_ref: nil,
owner: nil,
status: :closed
},
# ensure other pipes are unaffected
stdout: %Pipe{
name: :stdout,
status: :open
}
}
} = :sys.get_state(s.pid)
end
end
describe "process termination" do
test "if external program terminates on process exit" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
:ok = Process.close_stdin(s)
:timer.sleep(100)
refute os_process_alive?(os_pid)
end
test "watcher kills external command on process without exit_await" do
{os_pid, s} =
Task.async(fn ->
{:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
# ensure the script set the correct signal handlers (handlers to ignore signal)
assert {:ok, "ignored signals\n"} = Process.read(s)
# exit without waiting for the exile process
{os_pid, s}
end)
|> Task.await()
:timer.sleep(500)
# Exile Process should exit after Task process terminates
refute Elixir.Process.alive?(s.pid)
refute os_process_alive?(os_pid)
end
test "await_exit with timeout" do
{:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
assert {:ok, "ignored signals\n"} = Process.read(s)
# attempt to kill the process after 100ms
assert {:ok, 137} = Process.await_exit(s, 100)
refute os_process_alive?(os_pid)
refute Elixir.Process.alive?(s.pid)
end
test "exit status" do
{:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
assert {:ok, 10} == Process.await_exit(s)
end
test "writing binary larger than pipe buffer size" do
large_bin = generate_binary(5 * 65_535)
{:ok, s} = Process.start_link(~w(cat))
writer =
Task.async(fn ->
Process.change_pipe_owner(s, :stdin, self())
Process.write(s, large_bin)
end)
:timer.sleep(100)
iodata =
Stream.unfold(nil, fn _ ->
case Process.read(s) do
{:ok, data} -> {data, nil}
:eof -> nil
end
end)
|> Enum.to_list()
Task.await(writer)
assert IO.iodata_length(iodata) == 5 * 65_535
assert {:ok, 0} == Process.await_exit(s, 500)
end
test "if exile process is terminated on owner exit even if pipe owner is alive" do
parent = self()
owner =
spawn(fn ->
# owner process terminated without await_exit
{:ok, s} = Process.start_link(~w(cat))
snd(parent, {:ok, s})
:exit = recv(parent)
end)
{:ok, s} = recv(owner)
spawn_link(fn ->
Process.change_pipe_owner(s, :stdin, self())
block()
end)
spawn_link(fn ->
Process.change_pipe_owner(s, :stdout, self())
block()
end)
# wait for pipe owner to change
:timer.sleep(500)
snd(owner, :exit)
# wait for messages to propagate, if there are any
:timer.sleep(500)
refute Elixir.Process.alive?(owner)
refute Elixir.Process.alive?(s.pid)
end
test "if exile process is *NOT* terminated on owner exit, if any pipe owner is alive" do
parent = self()
{:ok, s} = Process.start_link(~w(cat))
io_proc =
spawn_link(fn ->
:ok = Process.change_pipe_owner(s, :stdin, self())
:ok = Process.change_pipe_owner(s, :stdout, self())
recv(parent)
end)
# wait for pipe owner to change
:timer.sleep(100)
# external process will be killed with SIGTERM (143)
assert {:ok, 143} = Process.await_exit(s, 100)
# wait for messages to propagate, if there are any
:timer.sleep(100)
assert Elixir.Process.alive?(s.pid)
assert %State{
pipes: %{
stdin: %Pipe{status: :open},
stdout: %Pipe{status: :open}
}
} = :sys.get_state(s.pid)
# when the io_proc exits, the pipes should be closed and process must terminate
snd(io_proc, :exit)
:timer.sleep(100)
refute Elixir.Process.alive?(s.pid)
end
test "when process is killed with a pending concurrent write" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
large_data =
Stream.cycle(["test"])
|> Stream.take(500_000)
|> Enum.to_list()
|> IO.iodata_to_binary()
task =
Task.async(fn ->
Process.change_pipe_owner(s, :stdin, self())
Process.write(s, large_data)
end)
# to avoid race conditions, like if process is killed before owner
# is changed
:timer.sleep(200)
assert {:ok, 1} == Process.await_exit(s)
refute os_process_alive?(os_pid)
assert {:error, :epipe} == Task.await(task)
end
test "if owner is killed when the exile process is killed" do
parent = self()
# create an exile process without linking to caller
owner =
spawn(fn ->
assert {:ok, s} = Process.start_link(~w(cat))
snd(parent, s.pid)
block()
end)
owner_ref = Elixir.Process.monitor(owner)
exile_pid = recv(owner)
exile_ref = Elixir.Process.monitor(exile_pid)
assert Elixir.Process.alive?(owner)
assert Elixir.Process.alive?(exile_pid)
true = Elixir.Process.exit(exile_pid, :kill)
assert_receive {:DOWN, ^owner_ref, :process, ^owner, :killed}
assert_receive {:DOWN, ^exile_ref, :process, ^exile_pid, :killed}
end
test "if exile process is killed when the owner is killed" do
parent = self()
# create an exile process without linking to caller
owner =
spawn(fn ->
assert {:ok, s} = Process.start_link(~w(cat))
snd(parent, s.pid)
block()
end)
owner_ref = Elixir.Process.monitor(owner)
exile_pid = recv(owner)
exile_ref = Elixir.Process.monitor(exile_pid)
assert Elixir.Process.alive?(owner)
assert Elixir.Process.alive?(exile_pid)
true = Elixir.Process.exit(owner, :kill)
assert_receive {:DOWN, ^owner_ref, :process, ^owner, :killed}
assert_receive {:DOWN, ^exile_ref, :process, ^exile_pid, :killed}
end
end
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link(~w(cat))
large_bin = generate_binary(65_535 * 5)
writer =
Task.async(fn ->
Process.change_pipe_owner(s, :stdin, self())
:ok = Process.write(s, large_bin)
add_event(logger, {:write, IO.iodata_length(large_bin)})
end)
:timer.sleep(50)
reader =
Task.async(fn ->
Process.change_pipe_owner(s, :stdout, self())
Stream.unfold(nil, fn _ ->
case Process.read(s) do
{:ok, data} ->
add_event(logger, {:read, IO.iodata_length(data)})
# delay in reading should delay writes
:timer.sleep(10)
{nil, nil}
:eof ->
nil
end
end)
|> Stream.run()
end)
Task.await(writer)
Task.await(reader)
assert {:ok, 0} == Process.await_exit(s)
events = get_events(logger)
{write_events, read_evants} = Enum.split_with(events, &match?({:write, _}, &1))
assert Enum.sum(Enum.map(read_evants, fn {:read, size} -> size end)) ==
Enum.sum(Enum.map(write_events, fn {:write, size} -> size end))
# There must be a read before write completes
assert hd(events) == {:read, 65_535}
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
open_files = parse_lsof(bin)
assert [
%{type: "PIPE", fd: "0", name: _},
%{type: "PIPE", fd: "1", name: _},
%{type: "CHR", fd: "2", name: "/dev/ttys007"}
] = open_files
end
describe "options and validation" do
test "cd option" do
parent = Path.expand("..", File.cwd!())
{:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
{:ok, dir} = Process.read(s)
assert String.trim(dir) == parent
assert {:ok, 0} = Process.await_exit(s)
end
test "when cd is invalid" do
assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
end
test "when user pass invalid option" do
assert {:error, "invalid opts: [invalid: :test]"} =
Process.start_link(~w(cat), invalid: :test)
end
test "env option" do
assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
assert {:ok, "test\n"} = Process.read(s)
assert {:ok, 0} = Process.await_exit(s)
end
test "if external process inherits beam env" do
:ok = System.put_env([{"BEAM_ENV_A", "10"}])
assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
assert {:ok, "10\n"} = Process.read(s)
assert {:ok, 0} = Process.await_exit(s)
end
test "if user env overrides beam env" do
:ok = System.put_env([{"BEAM_ENV", "base"}])
assert {:ok, s} =
Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
assert {:ok, "overridden\n"} = Process.read(s)
assert {:ok, 0} = Process.await_exit(s)
end
end
def start_parallel_reader(process, logger) do
spawn_link(fn ->
:ok = Process.change_pipe_owner(process, :stdout, self())
reader_loop(process, logger)
end)
end
def reader_loop(process, logger) do
case Process.read(process) do
{:ok, data} ->
add_event(logger, {:read, data})
reader_loop(process, logger)
:eof ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
Stream.repeatedly(fn -> "A" end)
|> Enum.take(size)
|> IO.iodata_to_binary()
end
defp block do
rand = :rand.uniform()
receive do
^rand -> :ok
end
end
defp snd(pid, term) do
send(pid, {self(), term})
end
defp recv(sender) do
receive do
{^sender, term} -> term
after
1000 ->
raise "recv timeout"
end
end
end

File Metadata

Mime Type
text/x-diff
Expires
Wed, Nov 27, 8:23 AM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40584
Default Alt Text
(57 KB)

Event Timeline