Page MenuHomePhorge

No OneTemporary

Size
24 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index 018abd8..ffce3b8 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,470 +1,452 @@
defmodule Exile.Process do
@moduledoc """
GenServer which wraps spawned external command.
One should use `Exile.stream!` over `Exile.Process`. stream internally manages this server for you. Use this only if you need more control over the life-cycle OS process.
## Overview
`Exile.Process` is an alternative primitive for Port. It has different interface and approach to running external programs to solve the issues associated with the ports.
### When compared to Port
* it is demand driven. User explicitly has to `read` output of the command and the progress of the external command is controlled using OS pipes. so unlike Port, this never cause memory issues in beam by loading more than we can consume
* it can close stdin of the program explicitly
* does not create zombie process. It always tries to cleanup resources
At high level it makes non-blocking asynchronous system calls to execute and interact with the external program. It completely bypasses beam implementation for the same using NIF. It uses `select()` system call for asynchronous IO. Most of the system calls are non-blocking, so it does not has adverse effect on scheduler. Issues such as "scheduler collapse".
"""
alias Exile.ProcessNif, as: Nif
require Logger
use GenServer
@default_opts [env: []]
@doc """
Starts `Exile.ProcessServer`
Starts external program using `cmd_with_args` with options `opts`
`cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
* `cd` - the directory to run the command in
* `env` - an enumerable of tuples containing environment key-value. These can be accessed in the external program
"""
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
with {:ok, args} <- normalize_args(cmd_with_args, opts) do
GenServer.start(__MODULE__, args)
end
end
def close_stdin(process) do
GenServer.call(process, :close_stdin, :infinity)
end
def write(process, iodata) do
GenServer.call(process, {:write, IO.iodata_to_binary(iodata)}, :infinity)
end
def read(process, size) when (is_integer(size) and size > 0) or size == :unbuffered do
GenServer.call(process, {:read, size}, :infinity)
end
def read(process) do
GenServer.call(process, {:read, :unbuffered}, :infinity)
end
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process, {:kill, signal}, :infinity)
end
def await_exit(process, timeout \\ :infinity) do
GenServer.call(process, {:await_exit, timeout}, :infinity)
end
def os_pid(process) do
GenServer.call(process, :os_pid, :infinity)
end
def stop(process), do: GenServer.call(process, :stop, :infinity)
## Server
defmodule Pending do
defstruct bin: [], remaining: 0, client_pid: nil
end
defstruct [
:args,
:errno,
:port,
:socket_path,
:stdin,
:stdout,
:context,
:status,
await: %{},
pending_read: nil,
pending_write: nil
]
alias __MODULE__
def init(args) do
state = %__MODULE__{
args: args,
errno: nil,
status: :init,
await: %{},
pending_read: %Pending{},
pending_write: %Pending{}
}
{:ok, state, {:continue, nil}}
end
def handle_continue(nil, state) do
+ Elixir.Process.flag(:trap_exit, true)
+
%{cmd_with_args: cmd_with_args, cd: cd, env: env} = state.args
socket_path = socket_path()
{:ok, uds} = :socket.open(:local, :stream, :default)
_ = :file.delete(socket_path)
{:ok, _} = :socket.bind(uds, %{family: :local, path: socket_path})
:ok = :socket.listen(uds)
port = exec(cmd_with_args, socket_path, env, cd)
{:os_pid, os_pid} = Port.info(port, :os_pid)
Exile.Watcher.watch(self(), os_pid, socket_path)
{write_fd_int, read_fd_int} = receive_fds(uds)
{:ok, write_fd} = Nif.nif_create_fd(write_fd_int)
{:ok, read_fd} = Nif.nif_create_fd(read_fd_int)
{:noreply,
%Process{
state
| port: port,
status: :start,
socket_path: socket_path,
stdin: read_fd,
stdout: write_fd
}}
end
def handle_call(:stop, _from, state) do
# TODO: pending write and read should receive "stopped" return
# value instead of exit signal
case state.status do
{:exit, _} -> :ok
_ -> Port.close(state.port)
end
{:stop, :normal, :ok, state}
end
def handle_call(:close_stdin, _from, state) do
case state.status do
{:exit, _} -> {:reply, :ok, state}
_ -> do_close(state, :stdin)
end
end
def handle_call({:await_exit, _}, _from, %{status: {:exit, status}} = state) do
{:reply, {:ok, {:exit, status}}, state}
end
def handle_call({:await_exit, timeout}, from, %{status: :start} = state) do
tref =
if timeout != :infinity do
Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
else
nil
end
- {:noreply, put_timer(state, from, :timeout, tref)}
+ {:noreply, %Process{state | await: Map.put(state.await, from, tref)}}
end
- def handle_call(_, _from, %{status: {:exit, status}} = state),
- do: {:reply, {:error, {:exit, status}}, state}
+ def handle_call(_, _from, %{status: {:exit, status}} = state) do
+ {:reply, {:error, {:exit, status}}, state}
+ end
def handle_call({:write, binary}, from, state) do
cond do
!is_binary(binary) ->
{:reply, {:error, :not_binary}, state}
state.pending_write.client_pid ->
{:reply, {:error, :pending_write}, state}
true ->
pending = %Pending{bin: binary, client_pid: from}
do_write(%Process{state | pending_write: pending})
end
end
def handle_call({:read, size}, from, state) do
if state.pending_read.client_pid do
{:reply, {:error, :pending_read}, state}
else
pending = %Pending{remaining: size, client_pid: from}
do_read(%Process{state | pending_read: pending})
end
end
def handle_call(:os_pid, _from, state) do
case Port.info(state.port, :os_pid) do
{:os_pid, os_pid} ->
{:reply, {:ok, os_pid}, state}
:undefined ->
Logger.debug("Process not alive")
{:reply, :undefined, state}
end
end
def handle_call({:kill, signal}, _from, state) do
{:reply, signal(state.port, signal), state}
end
def handle_info({:await_exit_timeout, from}, state) do
GenServer.reply(from, :timeout)
- {:noreply, clear_await(state, from)}
+ {:noreply, %Process{state | await: Map.delete(state.await, from)}}
end
def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
def handle_info({:select, _read_resource, _ref, :ready_input}, state), do: do_read(state)
def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state),
do: handle_port_exit(exit_status, state)
- def handle_info(msg, _state), do: raise(msg)
+ def handle_info({:EXIT, port, :normal}, %{port: port} = state), do: {:noreply, state}
+
+ def handle_info({:EXIT, _, reason}, state), do: {:stop, reason, state}
defp handle_port_exit(exit_status, state) do
- handle_porcess_exit(exit_status, state)
- {:noreply, %{state | status: {:exit, exit_status}}}
+ Enum.each(state.await, fn {from, tref} ->
+ GenServer.reply(from, {:ok, {:exit, exit_status}})
+
+ if tref do
+ Elixir.Process.cancel_timer(tref)
+ end
+ end)
+
+ {:noreply, %Process{state | status: {:exit, exit_status}}, await: %{}}
end
defp do_write(%Process{pending_write: %Pending{bin: <<>>}} = state) do
GenServer.reply(state.pending_write.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
defp do_write(%Process{pending_write: pending} = state) do
case Nif.nif_write(state.stdin, pending.bin) do
{:ok, size} ->
if size < byte_size(pending.bin) do
binary = binary_part(pending.bin, size, byte_size(pending.bin) - size)
{:noreply, %{state | pending_write: %Pending{pending | bin: binary}}}
else
GenServer.reply(pending.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: %Pending{remaining: :unbuffered} = pending} = state) do
case Nif.nif_read(state.stdout, -1) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, []})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
GenServer.reply(pending.client_pid, {:ok, binary})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %Process{state | pending_read: %Pending{}, errno: errno}}
end
end
defp do_read(%Process{pending_read: pending} = state) do
case Nif.nif_read(state.stdout, pending.remaining) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, pending.bin})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
if byte_size(binary) < pending.remaining do
pending = %Pending{
pending
| bin: [pending.bin | binary],
remaining: pending.remaining - byte_size(binary)
}
{:noreply, %Process{state | pending_read: pending}}
else
GenServer.reply(pending.client_pid, {:ok, [state.pending_read.bin | binary]})
{:noreply, %Process{state | pending_read: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | pending_read: %Pending{}, errno: errno}}
end
end
defp do_close(state, type) do
fd =
if type == :stdin do
state.stdin
else
state.stdout
end
case Nif.nif_close(fd) do
:ok ->
{:reply, :ok, state}
{:error, errno} ->
# FIXME: correct
raise errno
{:reply, {:error, errno}, %Process{state | errno: errno}}
end
end
- defp clear_await(state, from) do
- %Process{state | await: Map.delete(state.await, from)}
- end
-
- defp cancel_timer(state, from, key) do
- case get_timer(state, from, key) do
- nil -> :ok
- tref -> Elixir.Process.cancel_timer(tref)
- end
- end
-
- defp put_timer(state, from, key, timer) do
- if Map.has_key?(state.await, from) do
- await = put_in(state.await, [from, key], timer)
- %Process{state | await: await}
- else
- %Process{state | await: %{from => %{key => timer}}}
- end
- end
-
- defp get_timer(state, from, key), do: get_in(state.await, [from, key])
-
- defp handle_porcess_exit(exit_status, state) do
- Enum.reduce(state.await, state, fn {from, _}, state ->
- GenServer.reply(from, {:ok, {:exit, exit_status}})
- cancel_timer(state, from, :timeout)
- clear_await(state, from)
- end)
- end
-
defp normalize_cmd(cmd) do
path = System.find_executable(cmd)
if path do
{:ok, to_charlist(path)}
else
{:error, "command not found: #{inspect(cmd)}"}
end
end
defp normalize_cmd_args(args) do
if is_list(args) do
{:ok, Enum.map(args, &to_charlist/1)}
else
{:error, "command arguments must be list of strings. #{inspect(args)}"}
end
end
defp normalize_cd(nil), do: {:ok, ''}
defp normalize_cd(cd) do
if File.exists?(cd) && File.dir?(cd) do
{:ok, to_charlist(cd)}
else
{:error, "`:cd` must be valid directory path"}
end
end
defp normalize_env(nil), do: {:ok, []}
defp normalize_env(env) do
env =
Enum.map(env, fn {key, value} ->
{to_charlist(key), to_charlist(value)}
end)
{:ok, env}
end
defp validate_opts_fields(opts) do
{_, additional_opts} = Keyword.split(opts, [:cd, :env])
if Enum.empty?(additional_opts) do
:ok
else
{:error, "invalid opts: #{inspect(additional_opts)}"}
end
end
defp normalize_args([cmd | args], opts) when is_list(opts) do
with {:ok, cmd} <- normalize_cmd(cmd),
{:ok, args} <- normalize_cmd_args(args),
:ok <- validate_opts_fields(opts),
{:ok, cd} <- normalize_cd(opts[:cd]),
{:ok, env} <- normalize_env(opts[:env]) do
{:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env}}
end
end
defp normalize_args(_, _), do: {:error, "invalid arguments"}
@spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
defp exec(cmd_with_args, socket_path, env, cd) do
opts = []
opts = if cd, do: [{:cd, cd} | opts], else: []
opts = if env, do: [{:env, env} | opts], else: opts
opts =
[
:nouse_stdio,
:exit_status,
:binary,
args: [socket_path | cmd_with_args]
] ++ opts
Port.open({:spawn_executable, @spawner_path}, opts)
end
defp socket_path do
dir = System.tmp_dir!()
Path.join(dir, random_string())
end
defp random_string do
:crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
end
defp signal(port, sig) when sig in [:sigkill, :sigterm] do
case Port.info(port, :os_pid) do
{:os_pid, os_pid} -> Nif.nif_kill(os_pid, sig)
:undefined -> {:error, :process_not_alive}
end
end
@socket_timeout 2000
defp receive_fds(uds) do
case :socket.accept(uds, @socket_timeout) do
{:ok, usock} ->
{:ok, msg} = :socket.recvmsg(usock)
%{
ctrl: [
%{
data: <<read_fd_int::native-32, write_fd_int::native-32, _rest::binary>>,
level: :socket,
type: :rights
}
]
} = msg
:socket.close(usock)
{write_fd_int, read_fd_int}
error ->
raise error
end
end
end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index caba325..ca9e0e1 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,333 +1,367 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
alias Exile.Process
test "read" do
{:ok, s} = Process.start_link(~w(echo test))
assert {:eof, iodata} = Process.read(s, 100)
assert IO.iodata_to_binary(iodata) == "test\n"
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "write" do
{:ok, s} = Process.start_link(~w(cat))
assert :ok == Process.write(s, "hello")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "hello"
assert :ok == Process.write(s, "world")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "world"
assert :ok == Process.close_stdin(s)
assert {:eof, []} == Process.read(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
end
test "stdin close" do
logger = start_events_collector()
# base64 produces output only after getting EOF from stdin. we
# collect events in order and assert that we can still read from
# stdout even after closing stdin
{:ok, s} = Process.start_link(~w(base64))
# parallel reader should be blocked till we close stdin
start_parallel_reader(s, logger)
:timer.sleep(100)
assert :ok == Process.write(s, "hello")
add_event(logger, {:write, "hello"})
assert :ok == Process.write(s, "world")
add_event(logger, {:write, "world"})
:timer.sleep(100)
assert :ok == Process.close_stdin(s)
add_event(logger, :input_close)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
assert [
{:write, "hello"},
{:write, "world"},
:input_close,
{:read, "aGVsbG93b3JsZA==\n"},
:eof
] == get_events(logger)
end
test "external command termination on stop" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
:timer.sleep(100)
refute os_process_alive?(os_pid)
end
test "external command kill on stop" do
# cat command hangs waiting for EOF
{:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
if os_process_alive?(os_pid) do
:timer.sleep(3000)
refute os_process_alive?(os_pid)
else
:ok
end
end
test "exit status" do
{:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
assert {:ok, {:exit, 10}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "writing binary larger than pipe buffer size" do
large_bin = generate_binary(5 * 65535)
{:ok, s} = Process.start_link(~w(cat))
writer =
Task.async(fn ->
Process.write(s, large_bin)
Process.close_stdin(s)
end)
:timer.sleep(100)
{_, iodata} = Process.read(s, 5 * 65535)
Task.await(writer)
assert IO.iodata_length(iodata) == 5 * 65535
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link(~w(cat))
large_bin = generate_binary(65535 * 5)
writer =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.write(s, large_bin)
add_event(logger, {:write, i})
end)
Process.close_stdin(s)
end)
:timer.sleep(50)
reader =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.read(s, 5 * 65535)
add_event(logger, {:read, i})
# delay in reading should delay writes
:timer.sleep(10)
end)
end)
Task.await(writer)
Task.await(reader)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
assert [
write: 1,
read: 1,
write: 2,
read: 2,
write: 3,
read: 3,
write: 4,
read: 4,
write: 5,
read: 5,
write: 6,
read: 6,
write: 7,
read: 7,
write: 8,
read: 8,
write: 9,
read: 9,
write: 10,
read: 10
] == get_events(logger)
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
Process.stop(s)
open_files = parse_lsof(bin)
assert [%{fd: "0", name: _, type: "PIPE"}, %{type: "PIPE", fd: "1", name: _}] = open_files
end
test "process kill with pending write" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
large_data =
Stream.cycle(["test"]) |> Stream.take(500_000) |> Enum.to_list() |> IO.iodata_to_binary()
task =
Task.async(fn ->
try do
Process.write(s, large_data)
catch
:exit, reason -> reason
end
end)
:timer.sleep(200)
Process.stop(s)
:timer.sleep(3000)
refute os_process_alive?(os_pid)
assert {:normal, _} = Task.await(task)
end
+ test "concurrent read" do
+ {:ok, s} = Process.start_link(~w(cat))
+
+ task = Task.async(fn -> Process.read(s, 1) end)
+
+ # delaying concurrent read to avoid race-condition
+ Elixir.Process.sleep(100)
+ assert {:error, :pending_read} = Process.read(s, 1)
+
+ assert :ok == Process.close_stdin(s)
+ assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
+ Process.stop(s)
+ _ = Task.await(task)
+ end
+
test "cd" do
parent = Path.expand("..", File.cwd!())
{:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
{:ok, dir} = Process.read(s)
assert String.trim(dir) == parent
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "invalid path" do
assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
end
test "invalid opt" do
assert {:error, "invalid opts: [invalid: :test]"} =
Process.start_link(~w(cat), invalid: :test)
end
test "env" do
assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
assert {:ok, "test\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "if external process inherits beam env" do
:ok = System.put_env([{"BEAM_ENV_A", "10"}])
assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
assert {:ok, "10\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "if user env overrides beam env" do
:ok = System.put_env([{"BEAM_ENV", "base"}])
assert {:ok, s} =
Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
assert {:ok, "overridden\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
+ test "await_exit when process is stopped" do
+ assert {:ok, s} = Process.start_link(~w(cat))
+
+ tasks =
+ Enum.map(1..10, fn _ ->
+ Task.async(fn -> Process.await_exit(s) end)
+ end)
+
+ assert :ok == Process.close_stdin(s)
+
+ Elixir.Process.sleep(100)
+
+ Enum.each(tasks, fn task ->
+ assert {:ok, {:exit, 0}} = Task.await(task)
+ end)
+
+ Process.stop(s)
+ end
+
def start_parallel_reader(proc_server, logger) do
spawn_link(fn -> reader_loop(proc_server, logger) end)
end
def reader_loop(proc_server, logger) do
case Process.read(proc_server) do
{:ok, data} ->
add_event(logger, {:read, data})
reader_loop(proc_server, logger)
{:eof, []} ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
Stream.repeatedly(fn -> "A" end) |> Enum.take(size) |> IO.iodata_to_binary()
end
end

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 8:20 AM (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40986
Default Alt Text
(24 KB)

Event Timeline