diff --git a/lib/exile/process.ex b/lib/exile/process.ex new file mode 100644 index 0000000..f3fb851 --- /dev/null +++ b/lib/exile/process.ex @@ -0,0 +1,178 @@ +defmodule Exile.Process do + alias Exile.ProcessHelper + require Logger + use GenServer + + def start_link(cmd, args) do + GenServer.start(__MODULE__, %{cmd: cmd, args: args}) + end + + def close_stdin(process) do + GenServer.call(process, :close_stdin, :infinity) + end + + def write(process, binary) do + GenServer.call(process, {:write, binary}, :infinity) + end + + def read(process, bytes) do + GenServer.call(process, {:read, bytes}, :infinity) + end + + def os_pid(process) do + GenServer.call(process, :os_pid, :infinity) + end + + def kill(process, signal) when signal in [:sigkill, :sigterm] do + GenServer.call(process, {:kill, signal}, :infinity) + end + + def await_exit(process) do + GenServer.call(process, :await_exit, :infinity) + end + + ## Server + + def init(%{cmd: cmd, args: args}) do + path = :os.find_executable(to_charlist(cmd)) + + unless path do + raise "Command not found: #{cmd}" + end + + {:ok, %{cmd: path, args: args, read_acc: [], errno: nil}, {:continue, nil}} + end + + def handle_continue(nil, state) do + exec_args = Enum.map(state.args, &to_charlist/1) + + case ProcessHelper.exec_proc([state.cmd | exec_args]) do + {:ok, {pid, stdin, stdout}} -> + start_watcher(pid, stdin) + state = Map.merge(state, %{pid: pid, stdin: stdin, stdout: stdout}) + {:noreply, state} + + {:error, errno} -> + raise "Failed to start command: #{state.cmd}, errno: #{errno}" + end + end + + def handle_call({:write, binary}, from, state), do: do_write(state, from, binary) + + def handle_call({:read, bytes}, from, state), do: do_read(state, from, bytes) + + def handle_call(:os_pid, _from, state), do: {:reply, state.pid, state} + + def handle_call(:close_stdin, _from, state) do + case ProcessHelper.close_pipe(state.stdin) do + :ok -> + {:reply, :ok, state} + + {:error, errno} -> + {:reply, {:error, errno}, %{state | errno: errno}} + end + end + + def handle_call(:await_exit, from, state), do: do_await_exit(state, from) + + def handle_info({:read, bytes, from}, state), do: do_read(state, from, bytes) + def handle_info({:write, binary, from}, state), do: do_write(state, from, binary) + def handle_info({:await_exit, from}, state), do: do_await_exit(state, from) + + defp do_write(state, from, binary) do + case ProcessHelper.write_proc(state.stdin, binary) do + {:ok, bytes} -> + if bytes < IO.iodata_length(binary) do + binary = IO.iodata_to_binary(binary) + binary = binary_part(binary, bytes, IO.iodata_length(binary)) + Process.send_after(self(), {:write, binary, from}, 100) + else + GenServer.reply(from, :ok) + end + + {:noreply, state} + + {:error, errno} -> + GenServer.reply(from, {:error, errno}) + {:noreply, %{state | errno: errno}} + end + end + + defp do_read(state, from, bytes) do + case ProcessHelper.read_proc(state.stdout, bytes) do + {:ok, <<>>} -> + GenServer.reply(from, {:eof, state.read_acc}) + {:noreply, %{state | read_acc: []}} + + {:ok, binary} -> + if IO.iodata_length(binary) < bytes do + Process.send_after(self(), {:read, bytes - IO.iodata_length(binary), from}, 100) + {:noreply, %{state | read_acc: [state.read_acc | binary]}} + else + GenServer.reply(from, {:ok, [state.read_acc | binary]}) + {:noreply, %{state | read_acc: []}} + end + + # EAGAIN + {:error, 35} -> + Process.send_after(self(), {:read, bytes, from}, 100) + {:noreply, state} + + {:error, errno} -> + GenServer.reply(from, {:error, errno}) + {:noreply, %{state | errno: errno}} + end + end + + defp do_await_exit(%{pid: pid} = state, from) do + case ProcessHelper.wait_proc(pid) do + {^pid, status} -> + {:reply, {:ok, status}, state} + + {0, _} -> + Process.send_after(self(), {:await_exit, from}, 100) + {:noreply, state} + + {-1, status} -> + {:reply, {:error, status}, state} + end + end + + # def ps(pid) do + # {out, 0} = System.cmd("ps", [to_string(pid)]) + # out + # end + + @stdin_close_wait 3000 + @sigterm_wait 1000 + + # Try to gracefully terminate external proccess if the genserver associated with the process is killed + defp start_watcher(pid, stdin) do + parent = self() + + watcher_pid = + spawn(fn -> + ref = Process.monitor(parent) + send(parent, {self(), :done}) + + receive do + {:DOWN, ^ref, :process, ^parent, _reason} -> + with {:error, _} <- ProcessHelper.close_pipe(stdin), + _ <- :timer.sleep(@stdin_close_wait), + {p, _} <- ProcessHelper.wait_proc(pid), + false <- p != pid, + _ <- ProcessHelper.terminate_proc(pid), + _ <- :timer.sleep(@sigterm_wait), + {p, _} <- ProcessHelper.wait_proc(pid), + false <- p != pid, + _ <- ProcessHelper.kill_proc(pid) do + Logger.debug(fn -> "Killed process: #{pid}" end) + end + end + end) + + receive do + {^watcher_pid, :done} -> :ok + end + end +end diff --git a/lib/exile/process_helper.ex b/lib/exile/process_helper.ex index cc737d8..0c063a9 100644 --- a/lib/exile/process_helper.ex +++ b/lib/exile/process_helper.ex @@ -1,48 +1,39 @@ defmodule Exile.ProcessHelper do @on_load :load_nifs def load_nifs do :erlang.load_nif('./priv/exile_nif', 0) end - def exec(cmd, args) do - exec_proc([cmd | args]) - end - def exec_proc(_cmd) do raise "NIF exec_proc/0 not implemented" end def write_proc(_pipe, _bin) do raise "NIF write_proc/2 not implemented" end - def read_proc(_pipe) do + def read_proc(_pipe, _bytes) do raise "NIF read_proc/0 not implemented" end def close_pipe(_pipe) do raise "NIF close_pipe/1 not implemented" end def kill_proc(_pid) do raise "NIF kill_proc/1 not implemented" end def terminate_proc(_pid) do raise "NIF terminate_proc/1 not implemented" end def wait_proc(_pid) do raise "NIF wait_proc/1 not implemented" end def is_alive(_pid) do raise "NIF is_alive/1 not implemented" end - - def ps(pid) do - {out, 0} = System.cmd("ps", [to_string(pid)]) - out - end end diff --git a/lib/exile_nif.ex b/lib/exile_nif.ex deleted file mode 100644 index cc737d8..0000000 --- a/lib/exile_nif.ex +++ /dev/null @@ -1,48 +0,0 @@ -defmodule Exile.ProcessHelper do - @on_load :load_nifs - - def load_nifs do - :erlang.load_nif('./priv/exile_nif', 0) - end - - def exec(cmd, args) do - exec_proc([cmd | args]) - end - - def exec_proc(_cmd) do - raise "NIF exec_proc/0 not implemented" - end - - def write_proc(_pipe, _bin) do - raise "NIF write_proc/2 not implemented" - end - - def read_proc(_pipe) do - raise "NIF read_proc/0 not implemented" - end - - def close_pipe(_pipe) do - raise "NIF close_pipe/1 not implemented" - end - - def kill_proc(_pid) do - raise "NIF kill_proc/1 not implemented" - end - - def terminate_proc(_pid) do - raise "NIF terminate_proc/1 not implemented" - end - - def wait_proc(_pid) do - raise "NIF wait_proc/1 not implemented" - end - - def is_alive(_pid) do - raise "NIF is_alive/1 not implemented" - end - - def ps(pid) do - {out, 0} = System.cmd("ps", [to_string(pid)]) - out - end -end diff --git a/priv/exile_nif.c b/priv/exile_nif.c index 07123f8..468157f 100644 --- a/priv/exile_nif.c +++ b/priv/exile_nif.c @@ -1,256 +1,262 @@ #include "erl_nif.h" #include <errno.h> #include <fcntl.h> #include <signal.h> #include <stdbool.h> #include <stdio.h> #include <string.h> #include <sys/types.h> #include <unistd.h> #define ERL_TRUE enif_make_atom(env, "true") #define ERL_FALSE enif_make_atom(env, "false") #define ERL_OK(__TERM__) \ enif_make_tuple2(env, enif_make_atom(env, "ok"), __TERM__) #define ERL_ERROR(__TERM__) \ enif_make_tuple2(env, enif_make_atom(env, "error"), __TERM__) static const int PIPE_READ = 0; static const int PIPE_WRITE = 1; static const int MAX_ARGUMENTS = 20; static const int MAX_ARGUMENT_LEN = 1024; enum exec_status { SUCCESS, PIPE_CREATE_ERROR, PIPE_FLAG_ERROR, FORK_ERROR, PIPE_DUP_ERROR }; typedef struct ExecResults { enum exec_status status; int err; pid_t pid; int pipe_in; int pipe_out; } ExecResult; -static int set_flag(int fd) { - return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK | O_CLOEXEC); +static int set_flag(int fd, int flags) { + return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags); } static void close_all(int pipes[3][2]) { for (int i = 0; i < 3; i++) { if (pipes[i][PIPE_READ]) close(pipes[i][PIPE_READ]); if (pipes[i][PIPE_WRITE]) close(pipes[i][PIPE_WRITE]); } } #define RETURN_ERROR(__ERR__) \ do { \ fprintf(stderr, "error in start_proccess(), %s:%d %s\n", __FILE__, \ __LINE__, strerror(errno)); \ result.status = __ERR__; \ result.err = errno; \ close_all(pipes); \ return result; \ } while (0); static ExecResult start_proccess(char *args[]) { ExecResult result; pid_t pid; int pipes[3][2] = {{0, 0}, {0, 0}, {0, 0}}; if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1 || pipe(pipes[STDERR_FILENO]) == -1) { RETURN_ERROR(PIPE_CREATE_ERROR) } - if (set_flag(pipes[STDIN_FILENO][PIPE_READ]) < 0 || - set_flag(pipes[STDIN_FILENO][PIPE_WRITE]) < 0 || - set_flag(pipes[STDOUT_FILENO][PIPE_READ]) < 0 || - set_flag(pipes[STDOUT_FILENO][PIPE_WRITE]) < 0 || - set_flag(pipes[STDERR_FILENO][PIPE_READ]) < 0 || - set_flag(pipes[STDERR_FILENO][PIPE_WRITE]) < 0) { + if (set_flag(pipes[STDIN_FILENO][PIPE_READ], , O_CLOEXEC) < 0 || + set_flag(pipes[STDOUT_FILENO][PIPE_WRITE], O_CLOEXEC) < 0 || + set_flag(pipes[STDIN_FILENO][PIPE_WRITE], O_CLOEXEC | O_NONBLOCK) < 0 || + set_flag(pipes[STDOUT_FILENO][PIPE_READ], O_CLOEXEC | O_NONBLOCK) < 0 || + set_flag(pipes[STDERR_FILENO][PIPE_READ], O_CLOEXEC | O_NONBLOCK) < 0 || + set_flag(pipes[STDERR_FILENO][PIPE_WRITE], O_CLOEXEC | O_NONBLOCK) < 0) { RETURN_ERROR(PIPE_FLAG_ERROR) } + int fd; + switch (pid = fork()) { case -1: RETURN_ERROR(FORK_ERROR) case 0: close(STDIN_FILENO); close(STDOUT_FILENO); if (dup2(pipes[STDIN_FILENO][PIPE_READ], STDIN_FILENO) < 0) RETURN_ERROR(PIPE_DUP_ERROR) if (dup2(pipes[STDOUT_FILENO][PIPE_WRITE], STDOUT_FILENO) < 0) RETURN_ERROR(PIPE_DUP_ERROR) close_all(pipes); execvp(args[0], args); perror("execvp(): failed"); default: close(pipes[STDIN_FILENO][PIPE_READ]); close(pipes[STDOUT_FILENO][PIPE_WRITE]); result.pid = pid; result.pipe_in = pipes[STDIN_FILENO][PIPE_WRITE]; result.pipe_out = pipes[STDOUT_FILENO][PIPE_READ]; result.status = SUCCESS; return result; } } static ERL_NIF_TERM exec_proc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { char _temp[MAX_ARGUMENTS][MAX_ARGUMENT_LEN]; char *exec_args[MAX_ARGUMENTS + 1]; char *arg = NULL; unsigned int args_len; if (enif_get_list_length(env, argv[0], &args_len) != true) return enif_make_badarg(env); if (args_len > MAX_ARGUMENTS) return enif_make_badarg(env); ERL_NIF_TERM head, tail, list = argv[0]; for (int i = 0; i < args_len; i++) { if (enif_get_list_cell(env, list, &head, &tail) != true) return enif_make_badarg(env); if (enif_get_string(env, head, _temp[i], sizeof(_temp[i]), ERL_NIF_LATIN1) < 1) return enif_make_badarg(env); exec_args[i] = _temp[i]; list = tail; } exec_args[args_len] = NULL; ExecResult result = start_proccess(exec_args); ERL_NIF_TERM ret; switch (result.status) { case SUCCESS: ret = enif_make_tuple3(env, enif_make_int(env, result.pid), enif_make_int(env, result.pipe_in), enif_make_int(env, result.pipe_out)); return ERL_OK(ret); default: ret = enif_make_int(env, result.err); return ERL_ERROR(ret); } } static ERL_NIF_TERM write_proc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { int pipe_in; enif_get_int(env, argv[0], &pipe_in); if (argc != 2) enif_make_badarg(env); ErlNifBinary bin; bool is_success = enif_inspect_binary(env, argv[1], &bin); int result = write(pipe_in, bin.data, bin.size); if (result >= 0) { return ERL_OK(enif_make_int(env, result)); } else { perror("write()"); return ERL_ERROR(enif_make_int(env, errno)); } } static ERL_NIF_TERM close_pipe(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { int pipe; enif_get_int(env, argv[0], &pipe); int result = close(pipe); if (result == 0) { return enif_make_atom(env, "ok"); } else { perror("close()"); return ERL_ERROR(enif_make_int(env, errno)); } } static ERL_NIF_TERM read_proc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { - int pipe_out; + int pipe_out, bytes; enif_get_int(env, argv[0], &pipe_out); + enif_get_int(env, argv[1], &bytes); + + if (bytes > 65535 || bytes < 1) + enif_make_badarg(env); - char buf[65535]; + char buf[bytes]; int result = read(pipe_out, buf, sizeof(buf)); if (result >= 0) { ErlNifBinary bin; enif_alloc_binary(result, &bin); memcpy(bin.data, buf, result); return ERL_OK(enif_make_binary(env, &bin)); } else { perror("read()"); return ERL_ERROR(enif_make_int(env, errno)); } } static ERL_NIF_TERM is_alive(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { int pid; enif_get_int(env, argv[0], &pid); int result = kill(pid, 0); if (result == 0) { return ERL_TRUE; } else { return ERL_FALSE; } } static ERL_NIF_TERM terminate_proc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { int pid; enif_get_int(env, argv[0], &pid); return enif_make_int(env, kill(pid, SIGTERM)); } static ERL_NIF_TERM kill_proc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { int pid; enif_get_int(env, argv[0], &pid); return enif_make_int(env, kill(pid, SIGKILL)); } static ERL_NIF_TERM wait_proc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { int pid, status; enif_get_int(env, argv[0], &pid); int wpid = waitpid(pid, &status, WNOHANG); if (wpid != pid) { perror("waitpid()"); } return enif_make_tuple2(env, enif_make_int(env, wpid), enif_make_int(env, status)); } static ErlNifFunc nif_funcs[] = { {"exec_proc", 1, exec_proc}, {"write_proc", 2, write_proc}, - {"read_proc", 1, read_proc}, {"close_pipe", 1, close_pipe}, + {"read_proc", 2, read_proc}, {"close_pipe", 1, close_pipe}, {"terminate_proc", 1, terminate_proc}, {"wait_proc", 1, wait_proc}, {"kill_proc", 1, kill_proc}, {"is_alive", 1, is_alive}, }; -ERL_NIF_INIT(Elixir.Exile, nif_funcs, NULL, NULL, NULL, NULL) +ERL_NIF_INIT(Elixir.Exile.ProcessHelper, nif_funcs, NULL, NULL, NULL, NULL)