Page MenuHomePhorge

No OneTemporary

Size
34 KB
Referenced Files
None
Subscribers
None
diff --git a/.github/workflows/elixir.yaml b/.github/workflows/elixir.yaml
index d56cce1..e94ace9 100644
--- a/.github/workflows/elixir.yaml
+++ b/.github/workflows/elixir.yaml
@@ -1,33 +1,35 @@
name: Elixir CI
on: push
jobs:
test:
runs-on: ubuntu-latest
name: OTP ${{matrix.otp}} / Elixir ${{matrix.elixir}}
strategy:
matrix:
include:
- elixir: 1.9.4
otp: 22.2
- elixir: 1.10.4
otp: 23.0
+ - elixir: 1.12.x
+ otp: 24.x
steps:
- uses: actions/checkout@v2
- - uses: actions/setup-elixir@v1
+ - uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}
- name: Install Dependencies
run: mix deps.get
- name: gcc version
run: gcc --version
- name: Compile
run: mix compile --force --warnings-as-errors
- name: Check format
run: mix format --check-formatted
# - name: Run credo
# run: mix credo
- name: Run Tests
run: mix test --exclude skip:true
diff --git a/c_src/exile.c b/c_src/exile.c
index fd1fa65..c4b4f14 100644
--- a/c_src/exile.c
+++ b/c_src/exile.c
@@ -1,391 +1,394 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "erl_nif.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#ifdef ERTS_DIRTY_SCHEDULERS
#define USE_DIRTY_IO ERL_NIF_DIRTY_JOB_IO_BOUND
#else
#define USE_DIRTY_IO 0
#endif
// #define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
enif_fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, \
__func__); \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
#else
#define debug(...)
#endif
#define error(...) \
do { \
enif_fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, \
__func__); \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
#define assert_argc(argc, count) \
if (argc != count) { \
error("number of arguments must be %d", count); \
return enif_make_badarg(env); \
}
static const int UNBUFFERED_READ = -1;
static const int PIPE_BUF_SIZE = 65535;
static const int FD_CLOSED = -1;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_UNDEFINED;
static ERL_NIF_TERM ATOM_INVALID_FD;
static ERL_NIF_TERM ATOM_SELECT_CANCEL_ERROR;
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_SIGKILL;
static ERL_NIF_TERM ATOM_SIGTERM;
static void close_fd(int *fd) {
if (*fd != FD_CLOSED) {
close(*fd);
*fd = FD_CLOSED;
}
}
static int cancel_select(ErlNifEnv *env, int *fd) {
int ret;
if (*fd != FD_CLOSED) {
ret = enif_select(env, *fd, ERL_NIF_SELECT_STOP, fd, NULL, ATOM_UNDEFINED);
if (ret < 0)
perror("cancel_select()");
return ret;
}
return 0;
}
static void io_resource_dtor(ErlNifEnv *env, void *obj) {
debug("Exile io_resource_dtor called");
}
static void io_resource_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
close_fd(&fd);
debug("Exile io_resource_stop called %d", fd);
}
static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
int *fd = (int *)obj;
cancel_select(env, fd);
debug("Exile io_resource_down called");
}
-static ErlNifResourceTypeInit io_rt_init = {io_resource_dtor, io_resource_stop,
- io_resource_down};
+static ErlNifResourceTypeInit io_rt_init;
static ErlNifResourceType *FD_RT;
static inline ERL_NIF_TERM make_ok(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_OK, term);
}
static inline ERL_NIF_TERM make_error(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_ERROR, term);
}
/* time is assumed to be in microseconds */
static void notify_consumed_timeslice(ErlNifEnv *env, ErlNifTime start,
ErlNifTime stop) {
ErlNifTime pct;
pct = (ErlNifTime)((stop - start) / 10);
if (pct > 100)
pct = 100;
else if (pct == 0)
pct = 1;
enif_consume_timeslice(env, pct);
}
static int select_write(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_WRITE, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_write()");
return ret;
}
static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 2);
ErlNifTime start;
ssize_t size;
ErlNifBinary bin;
int write_errno;
int *fd;
start = enif_monotonic_time(ERL_NIF_USEC);
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (enif_inspect_binary(env, argv[1], &bin) != true)
return enif_make_badarg(env);
if (bin.size == 0)
return enif_make_badarg(env);
/* should we limit the bin.size here? */
size = write(*fd, bin.data, bin.size);
write_errno = errno;
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (size >= (ssize_t)bin.size) { // request completely satisfied
return make_ok(env, enif_make_int(env, size));
} else if (size >= 0) { // request partially satisfied
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, enif_make_int(env, size));
} else if (write_errno == EAGAIN || write_errno == EWOULDBLOCK) { // busy
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else {
perror("write()");
return make_error(env, enif_make_int(env, write_errno));
}
}
static int select_read(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_READ, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_read()");
return ret;
}
static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 1);
ERL_NIF_TERM term;
ErlNifPid pid;
int *fd;
int ret;
fd = enif_alloc_resource(FD_RT, sizeof(int));
if (!enif_get_int(env, argv[0], fd))
goto error_exit;
if (!enif_self(env, &pid)) {
error("failed get self pid");
goto error_exit;
}
ret = enif_monitor_process(env, fd, &pid, NULL);
if (ret < 0) {
error("no down callback is provided");
goto error_exit;
} else if (ret > 0) {
error("pid is not alive");
goto error_exit;
}
term = enif_make_resource(env, fd);
enif_release_resource(fd);
return make_ok(env, term);
error_exit:
enif_release_resource(fd);
return ATOM_ERROR;
}
static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 2);
ErlNifTime start;
int size, demand;
int *fd;
start = enif_monotonic_time(ERL_NIF_USEC);
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (!enif_get_int(env, argv[1], &demand))
return enif_make_badarg(env);
size = demand;
if (demand == UNBUFFERED_READ) {
size = PIPE_BUF_SIZE;
} else if (demand < 1) {
return enif_make_badarg(env);
} else if (demand > PIPE_BUF_SIZE) {
size = PIPE_BUF_SIZE;
}
unsigned char buf[size];
ssize_t result = read(*fd, buf, size);
int read_errno = errno;
ERL_NIF_TERM bin_term = 0;
if (result >= 0) {
/* no need to release this binary */
unsigned char *temp = enif_make_new_binary(env, result, &bin_term);
memcpy(temp, buf, result);
}
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (result >= 0) {
/* we do not 'select' if demand completely satisfied OR EOF OR its
* UNBUFFERED_READ */
if (result == demand || result == 0 || demand == UNBUFFERED_READ) {
return make_ok(env, bin_term);
} else { // demand partially satisfied
int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, bin_term);
}
} else {
if (read_errno == EAGAIN || read_errno == EWOULDBLOCK) { // busy
int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else {
perror("read()");
return make_error(env, enif_make_int(env, read_errno));
}
}
}
static ERL_NIF_TERM nif_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 1);
int *fd;
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (cancel_select(env, fd) < 0)
return make_error(env, ATOM_SELECT_CANCEL_ERROR);
close_fd(fd);
return ATOM_OK;
}
static ERL_NIF_TERM nif_is_os_pid_alive(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 1);
pid_t pid;
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
int result = kill(pid, 0);
if (result == 0)
return ATOM_TRUE;
else
return ATOM_FALSE;
}
static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 2);
pid_t pid;
int ret;
// we should not assume pid type to be `int`?
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
if (enif_compare(argv[1], ATOM_SIGKILL) == 0)
ret = kill(pid, SIGKILL);
else if (enif_compare(argv[1], ATOM_SIGTERM) == 0)
ret = kill(pid, SIGTERM);
else
return enif_make_badarg(env);
if (ret != 0) {
perror("[exile] failed to send signal");
return make_error(
env, enif_make_string(env, "failed to send signal", ERL_NIF_LATIN1));
}
return ATOM_OK;
}
static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
+ io_rt_init.dtor = io_resource_dtor;
+ io_rt_init.stop = io_resource_stop;
+ io_rt_init.down = io_resource_down;
+
FD_RT =
enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_FALSE = enif_make_atom(env, "false");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
ATOM_INVALID_FD = enif_make_atom(env, "invalid_fd_resource");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_SELECT_CANCEL_ERROR = enif_make_atom(env, "select_cancel_error");
ATOM_SIGTERM = enif_make_atom(env, "sigterm");
ATOM_SIGKILL = enif_make_atom(env, "sigkill");
return 0;
}
static void on_unload(ErlNifEnv *env, void *priv) {
debug("exile unload");
enif_free(priv);
}
static ErlNifFunc nif_funcs[] = {
{"nif_read", 2, nif_read, USE_DIRTY_IO},
{"nif_create_fd", 1, nif_create_fd, USE_DIRTY_IO},
{"nif_write", 2, nif_write, USE_DIRTY_IO},
{"nif_close", 1, nif_close, USE_DIRTY_IO},
{"nif_is_os_pid_alive", 1, nif_is_os_pid_alive, USE_DIRTY_IO},
{"nif_kill", 2, nif_kill, USE_DIRTY_IO}};
ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index ee338b3..35ca40f 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,510 +1,519 @@
defmodule Exile.Process do
@moduledoc """
GenServer which wraps spawned external command.
`Exile.stream!/1` should be preferred over this. Use this only if you need more control over the life-cycle of IO streams and OS process.
## Comparison with Port
* it is demand driven. User explicitly has to `read` the command output, and the progress of the external command is controlled using OS pipes. Exile never load more output than we can consume, so we should never experience memory issues
* it can close stdin while consuming stdout
* tries to handle zombie process by attempting to cleanup external process. Note that there is no middleware involved with exile so it is still possbile to endup with zombie process.
Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication, instead uses raw stdio and NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Make use of dirty-schedulers for IO
"""
alias Exile.ProcessNif, as: Nif
require Logger
use GenServer
defmodule Error do
defexception [:message]
end
alias Exile.Process.Error
@default_opts [env: []]
@doc """
Starts `Exile.ProcessServer`
Starts external program using `cmd_with_args` with options `opts`
`cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
* `cd` - the directory to run the command in
* `env` - a list of tuples containing environment key-value. These can be accessed in the external program
"""
@type process :: pid
@spec start_link(nonempty_list(String.t()),
cd: String.t(),
env: [{String.t(), String.t()}]
) :: {:ok, process} | {:error, any()}
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
with {:ok, args} <- normalize_args(cmd_with_args, opts) do
GenServer.start(__MODULE__, args)
end
end
@doc """
Closes external program's input stream
"""
@spec close_stdin(process) :: :ok | {:error, any()}
def close_stdin(process) do
GenServer.call(process, :close_stdin, :infinity)
end
@doc """
Writes iodata `data` to program's input streams
This blocks when the pipe is full
"""
@spec write(process, binary) :: :ok | {:error, any()}
def write(process, iodata) do
GenServer.call(process, {:write, IO.iodata_to_binary(iodata)}, :infinity)
end
@doc """
Return bytes written by the program to output stream.
This blocks until the programs write and flush the output depending on the `size`
"""
@spec read(process, pos_integer()) ::
{:ok, iodata} | {:eof, iodata} | {:error, any()}
def read(process, size) when (is_integer(size) and size > 0) or size == :unbuffered do
GenServer.call(process, {:read, size}, :infinity)
end
def read(process) do
GenServer.call(process, {:read, :unbuffered}, :infinity)
end
@doc """
Sends signal to external program
"""
@spec kill(process, :sigkill | :sigterm) :: :ok
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process, {:kill, signal}, :infinity)
end
@doc """
Waits for the program to terminate.
If the program terminates before timeout, it returns `{:ok, exit_status}` else returns `:timeout`
"""
@spec await_exit(process, timeout: timeout()) :: {:ok, integer()} | :timeout
def await_exit(process, timeout \\ :infinity) do
GenServer.call(process, {:await_exit, timeout}, :infinity)
end
@doc """
Returns os pid of the command
"""
@spec os_pid(process) :: pos_integer()
def os_pid(process) do
GenServer.call(process, :os_pid, :infinity)
end
@doc """
Stops the exile process, external program will be terminated in the background
"""
@spec stop(process) :: :ok
def stop(process), do: GenServer.call(process, :stop, :infinity)
## Server
defmodule Pending do
@moduledoc false
defstruct bin: [], remaining: 0, client_pid: nil
end
defstruct [
:args,
:errno,
:port,
:socket_path,
:stdin,
:stdout,
:context,
:status,
await: %{},
pending_read: nil,
pending_write: nil
]
alias __MODULE__
def init(args) do
state = %__MODULE__{
args: args,
errno: nil,
status: :init,
await: %{},
pending_read: %Pending{},
pending_write: %Pending{}
}
{:ok, state, {:continue, nil}}
end
def handle_continue(nil, state) do
Elixir.Process.flag(:trap_exit, true)
{:noreply, start_process(state)}
end
def handle_call(:stop, _from, state) do
# TODO: pending write and read should receive "stopped" return
# value instead of exit signal
case state.status do
{:exit, _} -> :ok
_ -> Port.close(state.port)
end
{:stop, :normal, :ok, state}
end
def handle_call(:close_stdin, _from, state) do
case state.status do
{:exit, _} -> {:reply, :ok, state}
_ -> do_close(state, :stdin)
end
end
def handle_call({:await_exit, _}, _from, %{status: {:exit, status}} = state) do
{:reply, {:ok, {:exit, status}}, state}
end
def handle_call({:await_exit, timeout}, from, %{status: :start} = state) do
tref =
if timeout != :infinity do
Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
else
nil
end
{:noreply, %Process{state | await: Map.put(state.await, from, tref)}}
end
def handle_call({:read, size}, from, state) do
if state.pending_read.client_pid do
{:reply, {:error, :pending_read}, state}
else
pending = %Pending{remaining: size, client_pid: from}
do_read(%Process{state | pending_read: pending})
end
end
def handle_call(_, _from, %{status: {:exit, status}} = state) do
{:reply, {:error, {:exit, status}}, state}
end
def handle_call({:write, binary}, from, state) do
cond do
!is_binary(binary) ->
{:reply, {:error, :not_binary}, state}
state.pending_write.client_pid ->
{:reply, {:error, :pending_write}, state}
true ->
pending = %Pending{bin: binary, client_pid: from}
do_write(%Process{state | pending_write: pending})
end
end
def handle_call(:os_pid, _from, state) do
case Port.info(state.port, :os_pid) do
{:os_pid, os_pid} ->
{:reply, {:ok, os_pid}, state}
:undefined ->
Logger.debug("Process not alive")
{:reply, :undefined, state}
end
end
def handle_call({:kill, signal}, _from, state) do
{:reply, signal(state.port, signal), state}
end
def handle_info({:await_exit_timeout, from}, state) do
GenServer.reply(from, :timeout)
{:noreply, %Process{state | await: Map.delete(state.await, from)}}
end
def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
def handle_info({:select, _read_resource, _ref, :ready_input}, state), do: do_read(state)
def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state),
do: handle_port_exit(exit_status, state)
def handle_info({:EXIT, port, :normal}, %{port: port} = state), do: {:noreply, state}
def handle_info({:EXIT, _, reason}, state), do: {:stop, reason, state}
defp handle_port_exit(exit_status, state) do
Enum.each(state.await, fn {from, tref} ->
GenServer.reply(from, {:ok, {:exit, exit_status}})
if tref do
Elixir.Process.cancel_timer(tref)
end
end)
{:noreply, %Process{state | status: {:exit, exit_status}}, await: %{}}
end
defp do_write(%Process{pending_write: %Pending{bin: <<>>}} = state) do
GenServer.reply(state.pending_write.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
defp do_write(%Process{pending_write: pending} = state) do
case Nif.nif_write(state.stdin, pending.bin) do
{:ok, size} ->
if size < byte_size(pending.bin) do
binary = binary_part(pending.bin, size, byte_size(pending.bin) - size)
{:noreply, %{state | pending_write: %Pending{pending | bin: binary}}}
else
GenServer.reply(pending.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: %Pending{remaining: :unbuffered} = pending} = state) do
case Nif.nif_read(state.stdout, -1) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, []})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
GenServer.reply(pending.client_pid, {:ok, binary})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %Process{state | pending_read: %Pending{}, errno: errno}}
end
end
defp do_read(%Process{pending_read: pending} = state) do
case Nif.nif_read(state.stdout, pending.remaining) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, pending.bin})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
if byte_size(binary) < pending.remaining do
pending = %Pending{
pending
| bin: [pending.bin | binary],
remaining: pending.remaining - byte_size(binary)
}
{:noreply, %Process{state | pending_read: pending}}
else
GenServer.reply(pending.client_pid, {:ok, [state.pending_read.bin | binary]})
{:noreply, %Process{state | pending_read: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | pending_read: %Pending{}, errno: errno}}
end
end
defp do_close(state, type) do
fd =
if type == :stdin do
state.stdin
else
state.stdout
end
case Nif.nif_close(fd) do
:ok ->
{:reply, :ok, state}
{:error, errno} ->
# FIXME: correct
raise errno
{:reply, {:error, errno}, %Process{state | errno: errno}}
end
end
defp normalize_cmd([cmd | _]) when is_binary(cmd) do
path = System.find_executable(cmd)
if path do
{:ok, to_charlist(path)}
else
{:error, "command not found: #{inspect(cmd)}"}
end
end
defp normalize_cmd(_cmd_with_args) do
{:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
end
defp normalize_cmd_args([_ | args]) do
if is_list(args) && Enum.all?(args, &is_binary/1) do
{:ok, Enum.map(args, &to_charlist/1)}
else
{:error, "command arguments must be list of strings. #{inspect(args)}"}
end
end
defp normalize_cd(nil), do: {:ok, ''}
defp normalize_cd(cd) do
if File.exists?(cd) && File.dir?(cd) do
{:ok, to_charlist(cd)}
else
{:error, "`:cd` must be valid directory path"}
end
end
defp normalize_env(nil), do: {:ok, []}
defp normalize_env(env) do
env =
Enum.map(env, fn {key, value} ->
{to_charlist(key), to_charlist(value)}
end)
{:ok, env}
end
defp validate_opts_fields(opts) do
{_, additional_opts} = Keyword.split(opts, [:cd, :env])
if Enum.empty?(additional_opts) do
:ok
else
{:error, "invalid opts: #{inspect(additional_opts)}"}
end
end
defp normalize_args(cmd_with_args, opts) do
with {:ok, cmd} <- normalize_cmd(cmd_with_args),
{:ok, args} <- normalize_cmd_args(cmd_with_args),
:ok <- validate_opts_fields(opts),
{:ok, cd} <- normalize_cd(opts[:cd]),
{:ok, env} <- normalize_env(opts[:env]) do
{:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env}}
end
end
@spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
defp exec(cmd_with_args, socket_path, env, cd) do
opts = []
opts = if cd, do: [{:cd, cd} | opts], else: []
opts = if env, do: [{:env, env} | opts], else: opts
opts =
[
:nouse_stdio,
:exit_status,
:binary,
args: [socket_path | cmd_with_args]
] ++ opts
Port.open({:spawn_executable, @spawner_path}, opts)
end
defp socket_path do
str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
path = Path.join(System.tmp_dir!(), str)
_ = :file.delete(path)
path
end
defp signal(port, sig) when sig in [:sigkill, :sigterm] do
case Port.info(port, :os_pid) do
{:os_pid, os_pid} -> Nif.nif_kill(os_pid, sig)
:undefined -> {:error, :process_not_alive}
end
end
defp start_process(%{args: %{cmd_with_args: cmd_with_args, cd: cd, env: env}} = state) do
path = socket_path()
{:ok, sock} = :socket.open(:local, :stream, :default)
try do
- {:ok, _} = :socket.bind(sock, %{family: :local, path: path})
+ :ok = socket_bind(sock, path)
:ok = :socket.listen(sock)
port = exec(cmd_with_args, path, env, cd)
{:os_pid, os_pid} = Port.info(port, :os_pid)
Exile.Watcher.watch(self(), os_pid, path)
{stdout, stdin} = receive_fds(sock)
%Process{
state
| port: port,
status: :start,
socket_path: path,
stdin: stdin,
stdout: stdout
}
after
:socket.close(sock)
end
end
@socket_timeout 2000
defp receive_fds(lsock) do
{:ok, sock} = :socket.accept(lsock, @socket_timeout)
try do
case :socket.recvmsg(sock, @socket_timeout) do
{:ok, msg} ->
%{
ctrl: [
%{
data: <<stdin_fd_int::native-32, stdout_fd_int::native-32, _::binary>>,
level: :socket,
type: :rights
}
]
} = msg
with {:ok, stdout} <- Nif.nif_create_fd(stdout_fd_int),
{:ok, stdin} <- Nif.nif_create_fd(stdin_fd_int) do
{stdout, stdin}
else
error ->
raise Error,
message: "Failed to create fd resources\n error: #{inspect(error)}"
end
{:error, reason} ->
raise Error,
message:
"Failed to receive stdin and stdout file descriptors\n error: #{inspect(reason)}"
end
after
:socket.close(sock)
end
end
+
+ defp socket_bind(sock, path) do
+ case :socket.bind(sock, %{family: :local, path: path}) do
+ :ok -> :ok
+ # for OTP version <= 24 compatibility
+ {:ok, _} -> :ok
+ other -> other
+ end
+ end
end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index 7a77697..bf26c08 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,173 +1,170 @@
defmodule Exile.Stream do
@moduledoc """
Defines a `Exile.Stream` struct returned by `Exile.stream!/3`.
"""
alias Exile.Process
defmodule Sink do
defstruct [:process]
defimpl Collectable do
def into(%{process: process} = stream) do
collector_fun = fn
:ok, {:cont, x} ->
:ok = Process.write(process, x)
:ok, :done ->
:ok = Process.close_stdin(process)
stream
:ok, :halt ->
:ok = Process.close_stdin(process)
end
{:ok, collector_fun}
end
end
end
defstruct [:process, :stream_opts]
@type t :: %__MODULE__{}
@doc false
def __build__(cmd_with_args, opts) do
{stream_opts, process_opts} = Keyword.split(opts, [:exit_timeout, :chunk_size, :input])
with {:ok, stream_opts} <- normalize_stream_opts(stream_opts) do
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
start_input_streamer(%Sink{process: process}, stream_opts.input)
%Exile.Stream{process: process, stream_opts: stream_opts}
else
{:error, error} -> raise ArgumentError, message: error
end
end
@doc false
defp start_input_streamer(sink, input) do
case input do
:no_input ->
:ok
{:enumerable, enum} ->
spawn_link(fn ->
Enum.into(enum, sink)
end)
{:collectable, func} ->
spawn_link(fn ->
func.(sink)
end)
end
end
defimpl Enumerable do
def reduce(%{process: process, stream_opts: stream_opts}, acc, fun) do
start_fun = fn -> :ok end
next_fun = fn :ok ->
case Process.read(process, stream_opts.chunk_size) do
{:eof, []} ->
{:halt, :normal}
{:eof, x} ->
# multiple reads on closed pipe always returns :eof
{[IO.iodata_to_binary(x)], :ok}
{:ok, x} ->
{[IO.iodata_to_binary(x)], :ok}
{:error, errno} ->
raise "Failed to read from the process. errno: #{errno}"
end
end
after_fun = fn exit_type ->
try do
# always close stdin before stoping to give the command chance to exit properly
Process.close_stdin(process)
result = Process.await_exit(process, stream_opts.exit_timeout)
case {exit_type, result} do
{_, :timeout} ->
Process.kill(process, :sigkill)
raise "command fail to exit within timeout: #{stream_opts[:exit_timeout]}"
{:normal, {:ok, {:exit, 0}}} ->
:ok
{:normal, {:ok, error}} ->
raise "command exited with status: #{inspect(error)}"
{exit_type, error} ->
Process.kill(process, :sigkill)
-
- raise "command exited with exit_type: #{inspect(exit_type)}, error: #{
- inspect(error)
- }"
+ raise "command exited with exit_type: #{exit_type}, error: #{inspect(error)}"
end
after
Process.stop(process)
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
end
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}
!is_function(term) && Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}
is_function(term, 1) ->
{:ok, {:collectable, term}}
true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end
defp normalize_chunk_size(nil), do: {:ok, 65536}
defp normalize_chunk_size(:no_buffering), do: {:ok, :no_buffering}
defp normalize_chunk_size(chunk_size) do
if is_integer(chunk_size) and chunk_size > 0,
do: {:ok, chunk_size},
else: {:error, ":exit_timeout must be either :infinity or a positive integer"}
end
defp normalize_exit_timeout(term) when term in [nil, :infinity], do: {:ok, :infinity}
defp normalize_exit_timeout(term) do
if is_integer(term),
do: {:ok, term},
else: {:error, ":exit_timeout must be either :infinity or an integer"}
end
defp normalize_stream_opts(opts) when is_list(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]),
{:ok, chunk_size} <- normalize_chunk_size(opts[:chunk_size]) do
{:ok, %{input: input, exit_timeout: exit_timeout, chunk_size: chunk_size}}
end
end
defp normalize_stream_opts(_), do: {:error, "stream_opts must be a keyword list"}
end
diff --git a/mix.exs b/mix.exs
index fcb71d2..648b1f5 100644
--- a/mix.exs
+++ b/mix.exs
@@ -1,56 +1,56 @@
defmodule Exile.MixProject do
use Mix.Project
def project do
[
app: :exile,
version: "0.1.0",
elixir: "~> 1.7",
start_permanent: Mix.env() == :prod,
compilers: [:elixir_make] ++ Mix.compilers(),
make_targets: ["all"],
make_clean: ["clean"],
deps: deps(),
# Package
package: package(),
description: description(),
# Docs
source_url: "https://github.com/akash-akya/exile",
homepage_url: "https://github.com/akash-akya/exile",
docs: [
main: "readme",
extras: ["README.md"]
]
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
mod: {Exile, []},
extra_applications: [:logger]
]
end
defp description do
"NIF based solution to interact with external programs with back-pressure"
end
defp package do
[
maintainers: ["Akash Hiremath"],
- licenses: ["MIT"],
+ licenses: ["Apache-2.0"],
links: %{GitHub: "https://github.com/akash-akya/exile"}
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:elixir_make, "~> 0.6", runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev}
]
end
end

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 2:54 AM (1 d, 16 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40807
Default Alt Text
(34 KB)

Event Timeline