Page MenuHomePhorge

No OneTemporary

Size
29 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/exile.ex b/lib/exile.ex
index 925e103..d7e0aa4 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,43 +1,57 @@
defmodule Exile do
@moduledoc """
Exile is an alternative for beam ports with back-pressure and non-blocking IO
"""
+ use Application
+
+ @doc false
+ def start(_type, _args) do
+ opts = [
+ name: Exile.WatcherSupervisor,
+ strategy: :one_for_one
+ ]
+
+ # we use DynamicSupervisor for cleaning up external processes on
+ # :init.stop or SIGTERM
+ DynamicSupervisor.start_link(opts)
+ end
+
@doc """
Runs the given command with arguments and return an Enumerable to read command output.
First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
1. input as Enumerable:
```elixir
# List
Exile.stream!(~w(bc -q), input: ["1+1\n", "2*2\n"]) |> Enum.to_list()
# Stream
Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
```
2. input as collectable:
If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
```elixir
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
By defaults no input will be given to the command
* `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
* `chunk_size` - Size of each iodata chunk emitted by Enumerable stream. When set to `nil` the output is unbuffered and chunk size will be variable. Defaults to 65535
All other options are passed to `Exile.Process.start_link/3`
### Examples
``` elixir
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
"""
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index fb08cd3..c74c8b3 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,403 +1,343 @@
defmodule Exile.Process do
@moduledoc """
GenServer which wraps spawned external command.
One should use `Exile.stream!` over `Exile.Process`. stream internally manages this server for you. Use this only if you need more control over the life-cycle OS process.
## Overview
`Exile.Process` is an alternative primitive for Port. It has different interface and approach to running external programs to solve the issues associated with the ports.
### When compared to Port
* it is demand driven. User explicitly has to `read` output of the command and the progress of the external command is controlled using OS pipes. so unlike Port, this never cause memory issues in beam by loading more than we can consume
* it can close stdin of the program explicitly
* does not create zombie process. It always tries to cleanup resources
At high level it makes non-blocking asynchronous system calls to execute and interact with the external program. It completely bypasses beam implementation for the same using NIF. It uses `select()` system call for asynchronous IO. Most of the system calls are non-blocking, so it does not has adverse effect on scheduler. Issues such as "scheduler collapse".
"""
alias Exile.ProcessNif
require Logger
use GenServer
defmacro fork_exec_failure(), do: 125
# delay between exit_check when io is busy (in milliseconds)
@exit_check_timeout 5
@default_opts [stderr_to_console: false, env: []]
@doc """
Starts `Exile.ProcessServer`
Starts external program using `cmd_with_args` with options `opts`
`cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
* `cd` - the directory to run the command in
* `env` - an enumerable of tuples containing environment key-value. These can be accessed in the external program
* `stderr_to_console` - whether to print stderr output to console. Defaults to `false`
"""
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
GenServer.start(__MODULE__, %{cmd_with_args: cmd_with_args, opts: opts})
end
def close_stdin(process) do
GenServer.call(process, :close_stdin, :infinity)
end
def write(process, iodata) do
GenServer.call(process, {:write, IO.iodata_to_binary(iodata)}, :infinity)
end
def read(process, size) when is_integer(size) or size == :unbuffered do
GenServer.call(process, {:read, size}, :infinity)
end
def read(process) do
GenServer.call(process, {:read, :unbuffered}, :infinity)
end
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process, {:kill, signal}, :infinity)
end
def await_exit(process, timeout \\ :infinity) do
GenServer.call(process, {:await_exit, timeout}, :infinity)
end
def os_pid(process) do
GenServer.call(process, :os_pid, :infinity)
end
def stop(process), do: GenServer.call(process, :stop, :infinity)
## Server
defmodule Pending do
defstruct bin: [], remaining: 0, client_pid: nil
end
defstruct [
:cmd_with_args,
:opts,
:errno,
:context,
:status,
await: %{},
pending_read: nil,
pending_write: nil
]
alias __MODULE__
def init(args) do
%{cmd_with_args: [cmd | args], opts: opts} = args
path = System.find_executable(cmd)
unless path do
raise "Command not found: #{cmd}"
end
if opts[:cd] do
if !File.exists?(opts[:cd]) || !File.dir?(opts[:cd]) do
raise ":dir is not a valid path"
end
end
state = %__MODULE__{
cmd_with_args: [path | args],
opts: opts,
errno: nil,
status: :init,
await: %{},
pending_read: %Pending{},
pending_write: %Pending{}
}
{:ok, state, {:continue, nil}}
end
def handle_continue(nil, state) do
exec_args = Enum.map(state.cmd_with_args, &to_charlist/1)
stderr_to_console = if state.opts[:stderr_to_console], do: 1, else: 0
cd = to_charlist(state.opts[:cd] || "")
env = normalize_env(state.opts[:env] || %{})
case ProcessNif.execute(exec_args, env, cd, stderr_to_console) do
{:ok, context} ->
- start_watcher(context)
+ {:ok, _} = Exile.Watcher.watch(self(), context)
{:noreply, %Process{state | context: context, status: :start}}
{:error, errno} ->
raise "Failed to start command: #{state.cmd_with_args}, errno: #{errno}"
end
end
def handle_call(:stop, _from, state) do
# watcher will take care of termination of external process
# TODO: pending write and read should receive "stopped" return
# value instead of exit signal
{:stop, :normal, :ok, state}
end
def handle_call(_, _from, %{status: {:exit, status}}), do: {:reply, {:error, {:exit, status}}}
def handle_call({:await_exit, timeout}, from, state) do
tref =
if timeout != :infinity do
Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
else
nil
end
state = put_timer(state, from, :timeout, tref)
check_exit(state, from)
end
def handle_call({:write, binary}, from, state) when is_binary(binary) do
pending = %Pending{bin: binary, client_pid: from}
do_write(%Process{state | pending_write: pending})
end
def handle_call({:read, size}, from, state) do
pending = %Pending{remaining: size, client_pid: from}
do_read(%Process{state | pending_read: pending})
end
def handle_call(:close_stdin, _from, state), do: do_close(state, :stdin)
def handle_call(:os_pid, _from, state), do: {:reply, ProcessNif.os_pid(state.context), state}
def handle_call({:kill, signal}, _from, state) do
do_kill(state.context, signal)
{:reply, :ok, %{state | status: {:exit, :killed}}}
end
def handle_info({:check_exit, from}, state), do: check_exit(state, from)
def handle_info({:await_exit_timeout, from}, state) do
cancel_timer(state, from, :check)
receive do
{:check_exit, ^from} -> :ok
after
0 -> :ok
end
GenServer.reply(from, :timeout)
{:noreply, clear_await(state, from)}
end
def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
def handle_info({:select, _read_resource, _ref, :ready_input}, state), do: do_read(state)
def handle_info(msg, _state), do: raise(msg)
defp do_write(%Process{pending_write: %Pending{bin: <<>>}} = state) do
GenServer.reply(state.pending_write.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
defp do_write(%Process{pending_write: pending} = state) do
case ProcessNif.sys_write(state.context, pending.bin) do
{:ok, size} ->
if size < byte_size(pending.bin) do
binary = binary_part(pending.bin, size, byte_size(pending.bin) - size)
{:noreply, %{state | pending_write: %Pending{pending | bin: binary}}}
else
GenServer.reply(pending.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: %Pending{remaining: :unbuffered} = pending} = state) do
case ProcessNif.sys_read(state.context, -1) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, []})
{:noreply, state}
{:ok, binary} ->
GenServer.reply(pending.client_pid, {:ok, binary})
{:noreply, state}
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: pending} = state) do
case ProcessNif.sys_read(state.context, pending.remaining) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, pending.bin})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
if byte_size(binary) < pending.remaining do
pending = %Pending{
pending
| bin: [pending.bin | binary],
remaining: pending.remaining - byte_size(binary)
}
{:noreply, %Process{state | pending_read: pending}}
else
GenServer.reply(pending.client_pid, {:ok, [state.pending_read.bin | binary]})
{:noreply, %Process{state | pending_read: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | pending_read: %Pending{}, errno: errno}}
end
end
defp check_exit(state, from) do
case ProcessNif.sys_wait(state.context) do
{:ok, {:exit, fork_exec_failure()}} ->
GenServer.reply(from, {:error, :failed_to_execute})
cancel_timer(state, from, :timeout)
{:noreply, clear_await(state, from)}
{:ok, status} ->
GenServer.reply(from, {:ok, status})
cancel_timer(state, from, :timeout)
{:noreply, clear_await(state, from)}
{:error, {0, _}} ->
# Ideally we should not poll and we should handle this with SIGCHLD signal
tref = Elixir.Process.send_after(self(), {:check_exit, from}, @exit_check_timeout)
{:noreply, put_timer(state, from, :check, tref)}
{:error, {-1, status}} ->
GenServer.reply(from, {:error, status})
cancel_timer(state, from, :timeout)
{:noreply, clear_await(state, from)}
end
end
defp do_kill(context, :sigkill), do: ProcessNif.sys_kill(context)
defp do_kill(context, :sigterm), do: ProcessNif.sys_terminate(context)
defp do_close(state, type) do
case ProcessNif.sys_close(state.context, stream_type(type)) do
:ok ->
{:reply, :ok, state}
{:error, errno} ->
raise errno
{:reply, {:error, errno}, %Process{state | errno: errno}}
end
end
defp clear_await(state, from) do
%Process{state | await: Map.delete(state.await, from)}
end
defp cancel_timer(state, from, key) do
case get_timer(state, from, key) do
nil -> :ok
tref -> Elixir.Process.cancel_timer(tref)
end
end
defp put_timer(state, from, key, timer) do
if Map.has_key?(state.await, from) do
await = put_in(state.await, [from, key], timer)
%Process{state | await: await}
else
%Process{state | await: %{from => %{key => timer}}}
end
end
defp get_timer(state, from, key), do: get_in(state.await, [from, key])
- # Try to gracefully terminate external process if the genserver associated with the process is killed
- defp start_watcher(context) do
- process_server = self()
- watcher_pid = spawn(fn -> watcher(process_server, context) end)
-
- receive do
- {^watcher_pid, :done} -> :ok
- end
- end
-
defp stream_type(:stdin), do: 0
defp stream_type(:stdout), do: 1
- defp process_exit?(context) do
- match?({:ok, _}, ProcessNif.sys_wait(context))
- end
-
- defp process_exit?(context, timeout) do
- if process_exit?(context) do
- true
- else
- :timer.sleep(timeout)
- process_exit?(context)
- end
- end
-
defp normalize_env(env) do
Enum.map(env, fn {key, value} ->
(String.trim(key) <> "=" <> String.trim(value))
|> to_charlist()
end)
end
-
- # for proper process exit parent of the child *must* wait() for
- # child processes termination exit and "pickup" after the exit
- # (receive child exit_status). Resources acquired by child such as
- # file descriptors won't be released even if the child process
- # itself is terminated.
- defp watcher(process_server, context) do
- ref = Elixir.Process.monitor(process_server)
- send(process_server, {self(), :done})
-
- receive do
- {:DOWN, ^ref, :process, ^process_server, _reason} ->
- try do
- Logger.debug(fn -> "Stopping external program" end)
-
- # sys_close is idempotent, calling it multiple times is okay
- ProcessNif.sys_close(context, stream_type(:stdin))
- ProcessNif.sys_close(context, stream_type(:stdout))
-
- # at max we wait for 100ms for program to exit
- process_exit?(context, 100) && throw(:done)
-
- Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
- ProcessNif.sys_terminate(context)
- process_exit?(context, 100) && throw(:done)
-
- Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
- ProcessNif.sys_kill(context)
- process_exit?(context, 1000) && throw(:done)
-
- Logger.error("[exile] failed to kill external process")
- raise "Failed to kill external process"
- catch
- :done -> Logger.debug(fn -> "External program exited successfully" end)
- end
- end
- end
end
diff --git a/lib/exile/watcher.ex b/lib/exile/watcher.ex
new file mode 100644
index 0000000..1146906
--- /dev/null
+++ b/lib/exile/watcher.ex
@@ -0,0 +1,84 @@
+defmodule Exile.Watcher do
+ use GenServer, restart: :temporary
+ require Logger
+ alias Exile.ProcessNif
+
+ def start_link(args) do
+ {:ok, _pid} = GenServer.start_link(__MODULE__, args)
+ end
+
+ def watch(pid, context) do
+ spec = {Exile.Watcher, %{pid: pid, context: context}}
+ DynamicSupervisor.start_child(Exile.WatcherSupervisor, spec)
+ end
+
+ def init(args) do
+ %{pid: pid, context: context} = args
+ Process.flag(:trap_exit, true)
+ ref = Elixir.Process.monitor(pid)
+ {:ok, %{pid: pid, context: context, ref: ref}}
+ end
+
+ def handle_info({:DOWN, ref, :process, pid, _reason}, %{pid: pid, context: context, ref: ref}) do
+ attempt_graceful_exit(context)
+ {:stop, :normal, nil}
+ end
+
+ def handle_info({:EXIT, _, reason}, nil), do: {:stop, reason, nil}
+
+ # when watcher is attempted to be killed, we forcefully kill external os process.
+ # This can happen when beam receive SIGTERM
+ def handle_info({:EXIT, _, reason}, %{pid: pid, context: context}) do
+ Logger.debug(fn -> "Watcher exiting. reason: #{inspect(reason)}" end)
+ Exile.Process.stop(pid)
+ attempt_graceful_exit(context)
+ {:stop, reason, nil}
+ end
+
+ # for proper process exit parent of the child *must* wait() for
+ # child processes termination exit and "pickup" after the exit
+ # (receive child exit_status). Resources acquired by child such as
+ # file descriptors won't be released even if the child process
+ # itself is terminated.
+ defp attempt_graceful_exit(context) do
+ try do
+ Logger.debug(fn -> "Stopping external program" end)
+
+ # sys_close is idempotent, calling it multiple times is okay
+ ProcessNif.sys_close(context, stream_type(:stdin))
+ ProcessNif.sys_close(context, stream_type(:stdout))
+
+ # at max we wait for 100ms for program to exit
+ process_exit?(context, 100) && throw(:done)
+
+ Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
+ ProcessNif.sys_terminate(context)
+ process_exit?(context, 100) && throw(:done)
+
+ Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
+ ProcessNif.sys_kill(context)
+ process_exit?(context, 1000) && throw(:done)
+
+ Logger.error("[exile] failed to kill external process")
+ raise "Failed to kill external process"
+ catch
+ :done -> Logger.debug(fn -> "External program exited successfully" end)
+ end
+ end
+
+ defp process_exit?(context) do
+ match?({:ok, _}, ProcessNif.sys_wait(context))
+ end
+
+ defp process_exit?(context, timeout) do
+ if process_exit?(context) do
+ true
+ else
+ :timer.sleep(timeout)
+ process_exit?(context)
+ end
+ end
+
+ defp stream_type(:stdin), do: 0
+ defp stream_type(:stdout), do: 1
+end
diff --git a/mix.exs b/mix.exs
index ff4e6ce..ffb9f00 100644
--- a/mix.exs
+++ b/mix.exs
@@ -1,30 +1,31 @@
defmodule Exile.MixProject do
use Mix.Project
def project do
[
app: :exile,
version: "0.1.0",
elixir: "~> 1.7",
start_permanent: Mix.env() == :prod,
compilers: [:elixir_make] ++ Mix.compilers(),
make_targets: ["all"],
make_clean: ["clean"],
deps: deps()
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
+ mod: {Exile, []},
extra_applications: [:logger]
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:elixir_make, "~> 0.6", runtime: false}
]
end
end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index 16767d6..9acc3c2 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,300 +1,307 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
alias Exile.Process
test "read" do
{:ok, s} = Process.start_link(~w(echo test))
assert {:eof, iodata} = Process.read(s, 100)
assert IO.iodata_to_binary(iodata) == "test\n"
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
+ Process.stop(s)
end
test "write" do
{:ok, s} = Process.start_link(~w(cat))
assert :ok == Process.write(s, "hello")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "hello"
assert :ok == Process.write(s, "world")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "world"
assert :ok == Process.close_stdin(s)
assert {:eof, []} == Process.read(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
+ Process.stop(s)
end
test "stdin close" do
logger = start_events_collector()
# base64 produces output only after getting EOF from stdin. we
# collect events in order and assert that we can still read from
# stdout even after closing stdin
{:ok, s} = Process.start_link(~w(base64))
# parallel reader should be blocked till we close stdin
start_parallel_reader(s, logger)
:timer.sleep(100)
assert :ok == Process.write(s, "hello")
add_event(logger, {:write, "hello"})
assert :ok == Process.write(s, "world")
add_event(logger, {:write, "world"})
:timer.sleep(100)
assert :ok == Process.close_stdin(s)
add_event(logger, :input_close)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 50)
+ Process.stop(s)
assert [
{:write, "hello"},
{:write, "world"},
:input_close,
{:read, "aGVsbG93b3JsZA==\n"},
:eof
] == get_events(logger)
end
test "external command termination on stop" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
:timer.sleep(100)
refute os_process_alive?(os_pid)
end
test "external command kill on stop" do
# cat command hangs waiting for EOF
{:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
if os_process_alive?(os_pid) do
:timer.sleep(3000)
refute os_process_alive?(os_pid)
else
:ok
end
end
test "exit status" do
{:ok, s} = Process.start_link(~w(sh -c "exit 2"))
assert {:ok, {:exit, 2}} == Process.await_exit(s, 500)
+ Process.stop(s)
end
test "writing binary larger than pipe buffer size" do
large_bin = generate_binary(5 * 65535)
{:ok, s} = Process.start_link(~w(cat))
writer =
Task.async(fn ->
Process.write(s, large_bin)
Process.close_stdin(s)
end)
:timer.sleep(100)
{_, iodata} = Process.read(s, 5 * 65535)
Task.await(writer)
assert IO.iodata_length(iodata) == 5 * 65535
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
+ Process.stop(s)
end
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link(~w(cat))
large_bin = generate_binary(65535 * 5)
writer =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.write(s, large_bin)
add_event(logger, {:write, i})
end)
Process.close_stdin(s)
end)
:timer.sleep(50)
reader =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.read(s, 5 * 65535)
add_event(logger, {:read, i})
# delay in reading should delay writes
:timer.sleep(10)
end)
end)
Task.await(writer)
Task.await(reader)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
+ Process.stop(s)
assert [
write: 1,
read: 1,
write: 2,
read: 2,
write: 3,
read: 3,
write: 4,
read: 4,
write: 5,
read: 5,
write: 6,
read: 6,
write: 7,
read: 7,
write: 8,
read: 8,
write: 9,
read: 9,
write: 10,
read: 10
] == get_events(logger)
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
Process.stop(s)
open_files = parse_lsof(bin)
assert [%{fd: "0", name: _, type: "PIPE"}, %{type: "PIPE", fd: "1", name: _}] = open_files
end
test "process kill with pending write" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
large_data =
Stream.cycle(["test"]) |> Stream.take(500_000) |> Enum.to_list() |> IO.iodata_to_binary()
task =
Task.async(fn ->
try do
Process.write(s, large_data)
catch
:exit, reason -> reason
end
end)
:timer.sleep(200)
Process.stop(s)
:timer.sleep(3000)
refute os_process_alive?(os_pid)
assert {:normal, _} = Task.await(task)
end
test "cd" do
parent = Path.expand("..", File.cwd!())
{:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
{:ok, dir} = Process.read(s)
assert String.trim(dir) == parent
assert {:ok, {:exit, 0}} = Process.await_exit(s)
+ Process.stop(s)
end
test "invalid path" do
assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
end
test "env" do
assert {:ok, s} = Process.start_link([fixture("env.sh")], env: %{"TEST_ENV" => "test"})
assert {:ok, "test"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
end
def start_parallel_reader(proc_server, logger) do
spawn_link(fn -> reader_loop(proc_server, logger) end)
end
def reader_loop(proc_server, logger) do
case Process.read(proc_server) do
{:ok, data} ->
add_event(logger, {:read, data})
reader_loop(proc_server, logger)
{:eof, []} ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
Stream.repeatedly(fn -> "A" end) |> Enum.take(size) |> IO.iodata_to_binary()
end
end
diff --git a/test/exile/sync_process_test.exs b/test/exile/sync_process_test.exs
index cc53806..51a74e8 100644
--- a/test/exile/sync_process_test.exs
+++ b/test/exile/sync_process_test.exs
@@ -1,26 +1,54 @@
defmodule Exile.SyncProcessTest do
use ExUnit.Case, async: false
alias Exile.Process
@bin Stream.repeatedly(fn -> "A" end) |> Enum.take(65535) |> IO.iodata_to_binary()
test "memory leak" do
:timer.sleep(1000)
before_exec = :erlang.memory(:total)
{:ok, s} = Process.start_link(~w(cat))
Enum.each(1..500, fn _ ->
:ok = Process.write(s, @bin)
{:ok, _} = Process.read(s, 65535)
end)
:timer.sleep(1000)
after_exec = :erlang.memory(:total)
assert_in_delta before_exec, after_exec, 1024 * 1024
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
+ Process.stop(s)
+ end
+
+ test "if watcher process exits on command exit" do
+ stop_all_children(Exile.WatcherSupervisor)
+
+ assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
+ assert {:ok, s} = Process.start_link(~w(cat))
+
+ # we spawn in background
+ :timer.sleep(200)
+
+ assert %{active: 1, workers: 1} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
+
+ Process.close_stdin(s)
+ assert {:ok, {:exit, 0}} = Process.await_exit(s, 500)
+ Process.stop(s)
+
+ # wait for watcher to terminate
+ :timer.sleep(200)
+ assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
+ end
+
+ defp stop_all_children(sup) do
+ DynamicSupervisor.which_children(sup)
+ |> Enum.each(fn {_, pid, _, _} ->
+ DynamicSupervisor.terminate_child(sup, pid)
+ end)
end
end
diff --git a/test/scripts/env.sh b/test/scripts/env.sh
deleted file mode 100755
index e684d9e..0000000
--- a/test/scripts/env.sh
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/bin/bash
-
-printf ${TEST_ENV}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 4:28 PM (1 d, 22 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
41067
Default Alt Text
(29 KB)

Event Timeline