Page MenuHomePhorge

No OneTemporary

Size
14 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/exile/watcher.ex b/lib/exile/watcher.ex
index acf7a3b..7e7def9 100644
--- a/lib/exile/watcher.ex
+++ b/lib/exile/watcher.ex
@@ -1,79 +1,79 @@
defmodule Exile.Watcher do
@moduledoc false
use GenServer, restart: :temporary
require Logger
alias Exile.ProcessNif, as: Nif
def start_link(args) do
{:ok, _pid} = GenServer.start_link(__MODULE__, args)
end
def watch(pid, os_pid, socket_path) do
spec = {Exile.Watcher, %{pid: pid, os_pid: os_pid, socket_path: socket_path}}
DynamicSupervisor.start_child(Exile.WatcherSupervisor, spec)
end
def init(args) do
%{pid: pid, os_pid: os_pid, socket_path: socket_path} = args
Process.flag(:trap_exit, true)
ref = Elixir.Process.monitor(pid)
{:ok, %{pid: pid, os_pid: os_pid, socket_path: socket_path, ref: ref}}
end
def handle_info({:DOWN, ref, :process, pid, _reason}, %{
pid: pid,
socket_path: socket_path,
os_pid: os_pid,
ref: ref
}) do
File.rm!(socket_path)
attempt_graceful_exit(os_pid)
{:stop, :normal, nil}
end
def handle_info({:EXIT, _, reason}, nil), do: {:stop, reason, nil}
# when watcher is attempted to be killed, we forcefully kill external os process.
# This can happen when beam receive SIGTERM
def handle_info({:EXIT, _, reason}, %{pid: pid, socket_path: socket_path, os_pid: os_pid}) do
Logger.debug(fn -> "Watcher exiting. reason: #{inspect(reason)}" end)
File.rm!(socket_path)
Exile.Process.stop(pid)
attempt_graceful_exit(os_pid)
{:stop, reason, nil}
end
defp attempt_graceful_exit(os_pid) do
try do
Logger.debug(fn -> "Stopping external program" end)
# at max we wait for 100ms for program to exit
process_exit?(os_pid, 100) && throw(:done)
Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
Nif.nif_kill(os_pid, :sigterm)
process_exit?(os_pid, 100) && throw(:done)
Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
Nif.nif_kill(os_pid, :sigkill)
- process_exit?(os_pid, 1000) && throw(:done)
+ process_exit?(os_pid, 200) && throw(:done)
Logger.error("failed to kill external process")
raise "Failed to kill external process"
catch
:done -> Logger.debug(fn -> "External program exited successfully" end)
end
end
defp process_exit?(os_pid), do: !Nif.nif_is_os_pid_alive(os_pid)
defp process_exit?(os_pid, timeout) do
if process_exit?(os_pid) do
true
else
:timer.sleep(timeout)
process_exit?(os_pid)
end
end
end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index ca9e0e1..ab923e1 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,367 +1,366 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
alias Exile.Process
test "read" do
{:ok, s} = Process.start_link(~w(echo test))
assert {:eof, iodata} = Process.read(s, 100)
assert IO.iodata_to_binary(iodata) == "test\n"
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "write" do
{:ok, s} = Process.start_link(~w(cat))
assert :ok == Process.write(s, "hello")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "hello"
assert :ok == Process.write(s, "world")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "world"
assert :ok == Process.close_stdin(s)
assert {:eof, []} == Process.read(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
end
test "stdin close" do
logger = start_events_collector()
# base64 produces output only after getting EOF from stdin. we
# collect events in order and assert that we can still read from
# stdout even after closing stdin
{:ok, s} = Process.start_link(~w(base64))
# parallel reader should be blocked till we close stdin
start_parallel_reader(s, logger)
:timer.sleep(100)
assert :ok == Process.write(s, "hello")
add_event(logger, {:write, "hello"})
assert :ok == Process.write(s, "world")
add_event(logger, {:write, "world"})
:timer.sleep(100)
assert :ok == Process.close_stdin(s)
add_event(logger, :input_close)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
assert [
{:write, "hello"},
{:write, "world"},
:input_close,
{:read, "aGVsbG93b3JsZA==\n"},
:eof
] == get_events(logger)
end
test "external command termination on stop" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
:timer.sleep(100)
refute os_process_alive?(os_pid)
end
test "external command kill on stop" do
- # cat command hangs waiting for EOF
{:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
if os_process_alive?(os_pid) do
- :timer.sleep(3000)
+ :timer.sleep(1000)
refute os_process_alive?(os_pid)
else
:ok
end
end
test "exit status" do
{:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
assert {:ok, {:exit, 10}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "writing binary larger than pipe buffer size" do
large_bin = generate_binary(5 * 65535)
{:ok, s} = Process.start_link(~w(cat))
writer =
Task.async(fn ->
Process.write(s, large_bin)
Process.close_stdin(s)
end)
:timer.sleep(100)
{_, iodata} = Process.read(s, 5 * 65535)
Task.await(writer)
assert IO.iodata_length(iodata) == 5 * 65535
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link(~w(cat))
large_bin = generate_binary(65535 * 5)
writer =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.write(s, large_bin)
add_event(logger, {:write, i})
end)
Process.close_stdin(s)
end)
:timer.sleep(50)
reader =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.read(s, 5 * 65535)
add_event(logger, {:read, i})
# delay in reading should delay writes
:timer.sleep(10)
end)
end)
Task.await(writer)
Task.await(reader)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
assert [
write: 1,
read: 1,
write: 2,
read: 2,
write: 3,
read: 3,
write: 4,
read: 4,
write: 5,
read: 5,
write: 6,
read: 6,
write: 7,
read: 7,
write: 8,
read: 8,
write: 9,
read: 9,
write: 10,
read: 10
] == get_events(logger)
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
Process.stop(s)
open_files = parse_lsof(bin)
assert [%{fd: "0", name: _, type: "PIPE"}, %{type: "PIPE", fd: "1", name: _}] = open_files
end
test "process kill with pending write" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
large_data =
Stream.cycle(["test"]) |> Stream.take(500_000) |> Enum.to_list() |> IO.iodata_to_binary()
task =
Task.async(fn ->
try do
Process.write(s, large_data)
catch
:exit, reason -> reason
end
end)
:timer.sleep(200)
Process.stop(s)
:timer.sleep(3000)
refute os_process_alive?(os_pid)
assert {:normal, _} = Task.await(task)
end
test "concurrent read" do
{:ok, s} = Process.start_link(~w(cat))
task = Task.async(fn -> Process.read(s, 1) end)
# delaying concurrent read to avoid race-condition
Elixir.Process.sleep(100)
assert {:error, :pending_read} = Process.read(s, 1)
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
_ = Task.await(task)
end
test "cd" do
parent = Path.expand("..", File.cwd!())
{:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
{:ok, dir} = Process.read(s)
assert String.trim(dir) == parent
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "invalid path" do
assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
end
test "invalid opt" do
assert {:error, "invalid opts: [invalid: :test]"} =
Process.start_link(~w(cat), invalid: :test)
end
test "env" do
assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
assert {:ok, "test\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "if external process inherits beam env" do
:ok = System.put_env([{"BEAM_ENV_A", "10"}])
assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
assert {:ok, "10\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "if user env overrides beam env" do
:ok = System.put_env([{"BEAM_ENV", "base"}])
assert {:ok, s} =
Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
assert {:ok, "overridden\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "await_exit when process is stopped" do
assert {:ok, s} = Process.start_link(~w(cat))
tasks =
Enum.map(1..10, fn _ ->
Task.async(fn -> Process.await_exit(s) end)
end)
assert :ok == Process.close_stdin(s)
Elixir.Process.sleep(100)
Enum.each(tasks, fn task ->
assert {:ok, {:exit, 0}} = Task.await(task)
end)
Process.stop(s)
end
def start_parallel_reader(proc_server, logger) do
spawn_link(fn -> reader_loop(proc_server, logger) end)
end
def reader_loop(proc_server, logger) do
case Process.read(proc_server) do
{:ok, data} ->
add_event(logger, {:read, data})
reader_loop(proc_server, logger)
{:eof, []} ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
Stream.repeatedly(fn -> "A" end) |> Enum.take(size) |> IO.iodata_to_binary()
end
end
diff --git a/test/exile/watcher_test.exs b/test/exile/watcher_test.exs
new file mode 100644
index 0000000..fc55ba0
--- /dev/null
+++ b/test/exile/watcher_test.exs
@@ -0,0 +1,55 @@
+defmodule Exile.WatcherTest do
+ use ExUnit.Case, async: true
+ alias Exile.Process
+
+ test "uds path socket cleanup after successful exit" do
+ {:ok, s} = Process.start_link(~w(cat))
+ %{socket_path: socket_path} = :sys.get_state(s)
+
+ assert File.exists?(socket_path)
+
+ :ok = Process.close_stdin(s)
+ Elixir.Process.sleep(100)
+
+ {:ok, {:exit, 0}} = Process.await_exit(s)
+ :ok = Process.stop(s)
+
+ Elixir.Process.sleep(100)
+
+ refute File.exists?(socket_path)
+ end
+
+ test "external process and uds path cleanup on error" do
+ {:ok, s} = Process.start_link(~w(cat))
+ %{socket_path: socket_path} = :sys.get_state(s)
+ {:ok, os_pid} = Process.os_pid(s)
+
+ assert File.exists?(socket_path)
+ assert os_process_alive?(os_pid)
+
+ Elixir.Process.exit(s, :kill)
+ Elixir.Process.sleep(100)
+
+ refute File.exists?(socket_path)
+ refute os_process_alive?(os_pid)
+ end
+
+ test "if external process is killed with SIGTERM" do
+ {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+
+ {:ok, os_pid} = Process.os_pid(s)
+ assert os_process_alive?(os_pid)
+ Process.stop(s)
+
+ :timer.sleep(1000)
+ refute os_process_alive?(os_pid)
+ end
+
+ defp os_process_alive?(pid) do
+ match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
+ end
+
+ defp fixture(script) do
+ Path.join([__DIR__, "../scripts", script])
+ end
+end

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 8:20 AM (1 d, 17 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40987
Default Alt Text
(14 KB)

Event Timeline