Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F113882
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Size
8 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/exile/process/exec.ex b/lib/exile/process/exec.ex
index a844d22..42ec470 100644
--- a/lib/exile/process/exec.ex
+++ b/lib/exile/process/exec.ex
@@ -1,216 +1,220 @@
defmodule Exile.Process.Exec do
@moduledoc false
alias Exile.Process.Nif
alias Exile.Process.Pipe
alias Exile.Process.State
@type args :: %{
cmd_with_args: [String.t()],
cd: String.t(),
env: [{String.t(), String.t()}]
}
@spec start(args, State.stderr_mode()) :: %{
port: port,
stdin: non_neg_integer(),
stdout: non_neg_integer(),
stderr: non_neg_integer()
}
def start(args, stderr) do
%{cmd_with_args: cmd_with_args, cd: cd, env: env} = args
socket_path = socket_path()
{:ok, sock} = :socket.open(:local, :stream, :default)
try do
:ok = socket_bind(sock, socket_path)
:ok = :socket.listen(sock)
spawner_cmdline_args = [socket_path, to_string(stderr) | cmd_with_args]
port_opts =
[:nouse_stdio, :exit_status, :binary, args: spawner_cmdline_args] ++
prune_nils(env: env, cd: cd)
port = Port.open({:spawn_executable, spawner_path()}, port_opts)
{:os_pid, os_pid} = Port.info(port, :os_pid)
Exile.Watcher.watch(self(), os_pid, socket_path)
{stdin_fd, stdout_fd, stderr_fd} = receive_fds(sock, stderr)
%{port: port, stdin: stdin_fd, stdout: stdout_fd, stderr: stderr_fd}
after
:socket.close(sock)
File.rm!(socket_path)
end
end
@spec normalize_exec_args(nonempty_list(), keyword()) ::
{:ok,
%{
cmd_with_args: nonempty_list(),
cd: charlist,
env: env,
stderr: :console | :disable | :consume
}}
| {:error, String.t()}
def normalize_exec_args(cmd_with_args, opts) do
with {:ok, cmd} <- normalize_cmd(cmd_with_args),
{:ok, args} <- normalize_cmd_args(cmd_with_args),
:ok <- validate_opts_fields(opts),
{:ok, cd} <- normalize_cd(opts[:cd]),
{:ok, stderr} <- normalize_stderr(opts[:stderr]),
{:ok, env} <- normalize_env(opts[:env]) do
{:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, stderr: stderr}}
end
end
@spec spawner_path :: String.t()
defp spawner_path do
:filename.join(:code.priv_dir(:exile), "spawner")
end
@socket_timeout 2000
@spec receive_fds(:socket.socket(), State.stderr_mode()) :: {Pipe.fd(), Pipe.fd(), Pipe.fd()}
defp receive_fds(lsock, stderr_mode) do
{:ok, sock} = :socket.accept(lsock, @socket_timeout)
try do
{:ok, msg} = :socket.recvmsg(sock, @socket_timeout)
%{ctrl: [%{data: data, level: :socket, type: :rights}]} = msg
<<stdin_fd::native-32, stdout_fd::native-32, stderr_fd::native-32, _::binary>> = data
# FDs are managed by the NIF resource life-cycle
{:ok, stdout} = Nif.nif_create_fd(stdout_fd)
{:ok, stdin} = Nif.nif_create_fd(stdin_fd)
+ {:ok, stderr} = Nif.nif_create_fd(stderr_fd)
- {:ok, stderr} =
- if stderr == :consume do
- Nif.nif_create_fd(stderr_fd)
+ stderr =
+ if stderr_mode == :consume do
+ stderr
else
- {:ok, nil}
+ # we have to explicitly close FD passed over socket.
+ # Since it will be tracked by the OS and kept open until we close.
+ Nif.nif_close(stderr)
+ nil
end
{stdin, stdout, stderr}
after
:socket.close(sock)
end
end
# skip type warning till we change min OTP version to 24.
@dialyzer {:nowarn_function, socket_bind: 2}
defp socket_bind(sock, path) do
case :socket.bind(sock, %{family: :local, path: path}) do
:ok -> :ok
# for compatibility with OTP version < 24
{:ok, _} -> :ok
other -> other
end
end
@spec socket_path() :: String.t()
defp socket_path do
str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
path = Path.join(System.tmp_dir!(), str)
_ = :file.delete(path)
path
end
@spec prune_nils(keyword()) :: keyword()
defp prune_nils(kv) do
Enum.reject(kv, fn {_, v} -> is_nil(v) end)
end
@spec normalize_cmd(nonempty_list()) :: {:ok, nonempty_list()} | {:error, binary()}
defp normalize_cmd(arg) do
case arg do
[cmd | _] when is_binary(cmd) ->
path = System.find_executable(cmd)
if path do
{:ok, to_charlist(path)}
else
{:error, "command not found: #{inspect(cmd)}"}
end
_ ->
{:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
end
end
defp normalize_cmd_args([_ | args]) do
if Enum.all?(args, &is_binary/1) do
{:ok, Enum.map(args, &to_charlist/1)}
else
{:error, "command arguments must be list of strings. #{inspect(args)}"}
end
end
@spec normalize_cd(binary) :: {:ok, charlist()} | {:error, String.t()}
defp normalize_cd(cd) do
case cd do
nil ->
{:ok, ''}
cd when is_binary(cd) ->
if File.exists?(cd) && File.dir?(cd) do
{:ok, to_charlist(cd)}
else
{:error, "`:cd` must be valid directory path"}
end
_ ->
{:error, "`:cd` must be a binary string"}
end
end
@type env :: list({String.t(), String.t()})
@spec normalize_env(env) :: {:ok, env} | {:error, String.t()}
defp normalize_env(env) do
case env do
nil ->
{:ok, []}
env when is_list(env) or is_map(env) ->
env =
Enum.map(env, fn {key, value} ->
{to_charlist(key), to_charlist(value)}
end)
{:ok, env}
_ ->
{:error, "`:env` must be a map or list of `{string, string}`"}
end
end
@spec normalize_stderr(stderr :: :console | :disable | :consume | nil) ::
{:ok, :console | :disable | :consume} | {:error, String.t()}
defp normalize_stderr(stderr) do
case stderr do
nil ->
{:ok, :console}
stderr when stderr in [:console, :disable, :consume] ->
{:ok, stderr}
_ ->
{:error, ":stderr must be an atom and one of :console, :disable, :consume"}
end
end
@spec validate_opts_fields(keyword) :: :ok | {:error, String.t()}
defp validate_opts_fields(opts) do
{_, additional_opts} = Keyword.split(opts, [:cd, :env, :stderr])
if Enum.empty?(additional_opts) do
:ok
else
{:error, "invalid opts: #{inspect(additional_opts)}"}
end
end
end
diff --git a/test/exile/sync_process_test.exs b/test/exile/sync_process_test.exs
index 164a69c..de7d441 100644
--- a/test/exile/sync_process_test.exs
+++ b/test/exile/sync_process_test.exs
@@ -1,54 +1,81 @@
defmodule Exile.SyncProcessTest do
use ExUnit.Case, async: false
alias Exile.Process
@bin Stream.repeatedly(fn -> "A" end) |> Enum.take(65_535) |> IO.iodata_to_binary()
test "memory leak" do
:timer.sleep(1000)
before_exec = :erlang.memory(:total)
{:ok, s} = Process.start_link(~w(cat))
Enum.each(1..500, fn _ ->
:ok = Process.write(s, @bin)
{:ok, _} = Process.read(s, 65_535)
end)
:timer.sleep(1000)
after_exec = :erlang.memory(:total)
assert_in_delta before_exec, after_exec, 1024 * 1024
assert :ok == Process.close_stdin(s)
assert {:ok, 0} == Process.await_exit(s, 500)
# Process.stop(s)
end
test "if watcher process exits on command exit" do
stop_all_children(Exile.WatcherSupervisor)
assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
assert {:ok, s} = Process.start_link(~w(cat))
# we spawn in background
:timer.sleep(200)
assert %{active: 1, workers: 1} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
Process.close_stdin(s)
assert {:ok, 0} = Process.await_exit(s, 500)
# Process.stop(s)
# wait for watcher to terminate
:timer.sleep(200)
assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
end
+ test "FDs are not leaked" do
+ before_count = opened_pipes()
+
+ for _ <- 1..100 do
+ {:ok, s} = Process.start_link(~w(date))
+ :ok = Process.close_stdin(s)
+ assert {:ok, _} = Process.read_any(s, 100)
+ assert :eof = Process.read_any(s, 100)
+ assert {:ok, 0} = Process.await_exit(s, 100)
+ end
+
+ # let the dust settle
+ :timer.sleep(2000)
+
+ after_count = opened_pipes()
+
+ assert before_count == after_count
+ end
+
+ defp opened_pipes do
+ {pipe_count, 0} = System.shell(~s(lsof -a -p #{:os.getpid()} | grep " PIPE " | wc -l))
+
+ pipe_count
+ |> String.trim()
+ |> String.to_integer()
+ end
+
defp stop_all_children(sup) do
DynamicSupervisor.which_children(sup)
|> Enum.each(fn {_, pid, _, _} ->
DynamicSupervisor.terminate_child(sup, pid)
end)
end
end
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Nov 25, 3:03 PM (1 d, 7 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
39935
Default Alt Text
(8 KB)
Attached To
Mode
R14 exile
Attached
Detach File
Event Timeline
Log In to Comment