Page MenuHomePhorge

No OneTemporary

Size
130 KB
Referenced Files
None
Subscribers
None
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index fa418dc..199960c 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -1,35 +1,86 @@
-name: CI
+name: Elixir CI
on:
- push
- pull_request
jobs:
test:
- runs-on: ubuntu-latest
+ runs-on: ubuntu-20.04
name: Test - Elixir ${{matrix.elixir}} / OTP ${{matrix.otp}}
strategy:
matrix:
include:
- elixir: 1.10.x
otp: 22.x
- elixir: 1.12.x
otp: 23.x
- elixir: 1.14.x
otp: 24.x
steps:
- uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}
- uses: actions/checkout@v3
+ - name: Cache Dependencies
+ id: mix-cache
+ uses: actions/cache@v3
+ with:
+ path: |
+ deps
+ _build
+ key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }}
+
+ - name: Install Dependencies
+ if: steps.mix-cache.outputs.cache-hit != 'true'
+ run: |
+ mix deps.get
+ mix deps.compile
+
+ - run: gcc --version
+ - run: mix compile --warnings-as-errors
+ - run: mix test --exclude skip:true --trace
+
+ lint:
+ runs-on: ubuntu-22.04
+ name: Lint
+ strategy:
+ matrix:
+ include:
+ - elixir: 1.14.x
+ otp: 25.x
+ steps:
+ - uses: erlef/setup-beam@v1
+ with:
+ otp-version: ${{matrix.otp}}
+ elixir-version: ${{matrix.elixir}}
+
+ - uses: actions/checkout@v3
+
+ - name: Cache Dependencies
+ id: mix-cache
+ uses: actions/cache@v3
+ with:
+ path: |
+ deps
+ _build
+ key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }}
+
+ - name: Install Dependencies
+ if: steps.mix-cache.outputs.cache-hit != 'true'
+ run: |
+ mkdir -p priv/plts
+ mix deps.get
+ mix deps.compile
+ mix dialyzer --plt
+
- run: mix deps.get
- run: mix deps.unlock --check-unused
- run: mix format --check-formatted
- - run: gcc --version
- - run: mix compile --force --warnings-as-errors
- - run: mix test --exclude skip:true --trace
+ - run: mix credo --strict
+ - run: mix dialyzer --plt
diff --git a/c_src/exile.c b/c_src/exile.c
index ed493e3..5e1645c 100644
--- a/c_src/exile.c
+++ b/c_src/exile.c
@@ -1,351 +1,361 @@
#include "erl_nif.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "utils.h"
#ifdef ERTS_DIRTY_SCHEDULERS
#define USE_DIRTY_IO ERL_NIF_DIRTY_JOB_IO_BOUND
#else
#define USE_DIRTY_IO 0
#endif
static const int UNBUFFERED_READ = -1;
static const int PIPE_BUF_SIZE = 65535;
static const int FD_CLOSED = -1;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_UNDEFINED;
static ERL_NIF_TERM ATOM_INVALID_FD;
static ERL_NIF_TERM ATOM_SELECT_CANCEL_ERROR;
static ERL_NIF_TERM ATOM_EAGAIN;
+static ERL_NIF_TERM ATOM_EPIPE;
-static ERL_NIF_TERM ATOM_SIGKILL;
static ERL_NIF_TERM ATOM_SIGTERM;
+static ERL_NIF_TERM ATOM_SIGKILL;
+static ERL_NIF_TERM ATOM_SIGPIPE;
static void close_fd(int *fd) {
if (*fd != FD_CLOSED) {
close(*fd);
*fd = FD_CLOSED;
}
}
static int cancel_select(ErlNifEnv *env, int *fd) {
int ret;
if (*fd != FD_CLOSED) {
ret = enif_select(env, *fd, ERL_NIF_SELECT_STOP, fd, NULL, ATOM_UNDEFINED);
if (ret < 0)
perror("cancel_select()");
return ret;
}
return 0;
}
static void io_resource_dtor(ErlNifEnv *env, void *obj) {
debug("Exile io_resource_dtor called");
}
static void io_resource_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
close_fd(&fd);
debug("Exile io_resource_stop called %d", fd);
}
static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
int *fd = (int *)obj;
cancel_select(env, fd);
debug("Exile io_resource_down called");
}
static ErlNifResourceTypeInit io_rt_init;
static ErlNifResourceType *FD_RT;
static inline ERL_NIF_TERM make_ok(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_OK, term);
}
static inline ERL_NIF_TERM make_error(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_ERROR, term);
}
/* time is assumed to be in microseconds */
static void notify_consumed_timeslice(ErlNifEnv *env, ErlNifTime start,
ErlNifTime stop) {
ErlNifTime pct;
pct = (ErlNifTime)((stop - start) / 10);
if (pct > 100)
pct = 100;
else if (pct == 0)
pct = 1;
enif_consume_timeslice(env, pct);
}
static int select_write(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_WRITE, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_write()");
return ret;
}
static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 2);
ErlNifTime start;
ssize_t size;
ErlNifBinary bin;
int write_errno;
int *fd;
start = enif_monotonic_time(ERL_NIF_USEC);
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (enif_inspect_binary(env, argv[1], &bin) != true)
return enif_make_badarg(env);
if (bin.size == 0)
return enif_make_badarg(env);
/* should we limit the bin.size here? */
size = write(*fd, bin.data, bin.size);
write_errno = errno;
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (size >= (ssize_t)bin.size) { // request completely satisfied
return make_ok(env, enif_make_int(env, size));
} else if (size >= 0) { // request partially satisfied
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, enif_make_int(env, size));
} else if (write_errno == EAGAIN || write_errno == EWOULDBLOCK) { // busy
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
+ } else if (write_errno == EPIPE) {
+ return make_error(env, ATOM_EPIPE);
} else {
perror("write()");
return make_error(env, enif_make_int(env, write_errno));
}
}
static int select_read(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_READ, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_read()");
return ret;
}
static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 1);
ERL_NIF_TERM term;
ErlNifPid pid;
int *fd;
int ret;
fd = enif_alloc_resource(FD_RT, sizeof(int));
if (!enif_get_int(env, argv[0], fd))
goto error_exit;
if (!enif_self(env, &pid)) {
error("failed get self pid");
goto error_exit;
}
ret = enif_monitor_process(env, fd, &pid, NULL);
if (ret < 0) {
error("no down callback is provided");
goto error_exit;
} else if (ret > 0) {
error("pid is not alive");
goto error_exit;
}
term = enif_make_resource(env, fd);
enif_release_resource(fd);
return make_ok(env, term);
error_exit:
enif_release_resource(fd);
return ATOM_ERROR;
}
static ERL_NIF_TERM read_fd(ErlNifEnv *env, int *fd, int max_size) {
if (max_size == UNBUFFERED_READ) {
max_size = PIPE_BUF_SIZE;
} else if (max_size < 1) {
return enif_make_badarg(env);
} else if (max_size > PIPE_BUF_SIZE) {
max_size = PIPE_BUF_SIZE;
}
ErlNifTime start = enif_monotonic_time(ERL_NIF_USEC);
unsigned char buf[max_size];
ssize_t result = read(*fd, buf, max_size);
int read_errno = errno;
ERL_NIF_TERM bin_term = 0;
if (result >= 0) {
/* no need to release this binary */
unsigned char *temp = enif_make_new_binary(env, result, &bin_term);
memcpy(temp, buf, result);
}
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (result >= 0) {
return make_ok(env, bin_term);
} else if (read_errno == EAGAIN || read_errno == EWOULDBLOCK) { // busy
int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
+ } else if (read_errno == EPIPE) {
+ return make_error(env, ATOM_EPIPE);
} else {
perror("read_fd()");
return make_error(env, enif_make_int(env, read_errno));
}
}
static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 2);
int max_size;
int *fd;
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (!enif_get_int(env, argv[1], &max_size))
return enif_make_badarg(env);
return read_fd(env, fd, max_size);
}
static ERL_NIF_TERM nif_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 1);
int *fd;
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (cancel_select(env, fd) < 0)
return make_error(env, ATOM_SELECT_CANCEL_ERROR);
close_fd(fd);
return ATOM_OK;
}
static ERL_NIF_TERM nif_is_os_pid_alive(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 1);
pid_t pid;
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
int result = kill(pid, 0);
if (result == 0)
return ATOM_TRUE;
else
return ATOM_FALSE;
}
static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 2);
pid_t pid;
int ret;
// we should not assume pid type to be `int`?
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
if (enif_compare(argv[1], ATOM_SIGKILL) == 0)
ret = kill(pid, SIGKILL);
else if (enif_compare(argv[1], ATOM_SIGTERM) == 0)
ret = kill(pid, SIGTERM);
- else
+ else if (enif_compare(argv[1], ATOM_SIGPIPE) == 0) {
+ ret = kill(pid, SIGPIPE);
+ } else
return enif_make_badarg(env);
if (ret != 0) {
perror("[exile] failed to send signal");
return make_error(
env, enif_make_string(env, "failed to send signal", ERL_NIF_LATIN1));
}
return ATOM_OK;
}
static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
io_rt_init.dtor = io_resource_dtor;
io_rt_init.stop = io_resource_stop;
io_rt_init.down = io_resource_down;
FD_RT =
enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_FALSE = enif_make_atom(env, "false");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
ATOM_INVALID_FD = enif_make_atom(env, "invalid_fd_resource");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
+ ATOM_EPIPE = enif_make_atom(env, "epipe");
ATOM_SELECT_CANCEL_ERROR = enif_make_atom(env, "select_cancel_error");
ATOM_SIGTERM = enif_make_atom(env, "sigterm");
ATOM_SIGKILL = enif_make_atom(env, "sigkill");
+ ATOM_SIGPIPE = enif_make_atom(env, "sigpipe");
return 0;
}
static void on_unload(ErlNifEnv *env, void *priv) {
debug("exile unload");
enif_free(priv);
}
static ErlNifFunc nif_funcs[] = {
{"nif_read", 2, nif_read, USE_DIRTY_IO},
{"nif_create_fd", 1, nif_create_fd, USE_DIRTY_IO},
{"nif_write", 2, nif_write, USE_DIRTY_IO},
{"nif_close", 1, nif_close, USE_DIRTY_IO},
{"nif_is_os_pid_alive", 1, nif_is_os_pid_alive, USE_DIRTY_IO},
{"nif_kill", 2, nif_kill, USE_DIRTY_IO}};
-ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
+ERL_NIF_INIT(Elixir.Exile.Process.Nif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
diff --git a/c_src/spawner.c b/c_src/spawner.c
index 104307c..6816368 100644
--- a/c_src/spawner.c
+++ b/c_src/spawner.c
@@ -1,252 +1,252 @@
#include <fcntl.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <unistd.h>
// these definitions are Linux only at the moment
#ifndef CMSG_LEN
socklen_t CMSG_LEN(size_t len) {
return (CMSG_DATA((struct cmsghdr *)NULL) - (unsigned char *)NULL) + len;
}
#endif
#ifndef CMSG_SPACE
socklen_t CMSG_SPACE(size_t len) {
struct msghdr msg;
struct cmsghdr cmsg;
msg.msg_control = &cmsg;
msg.msg_controllen =
~0; /* To maximize the chance that CMSG_NXTHDR won't return NULL */
cmsg.cmsg_len = CMSG_LEN(len);
return (unsigned char *)CMSG_NXTHDR(&msg, &cmsg) - (unsigned char *)&cmsg;
}
#endif
// #define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, __func__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
} while (0)
#else
#define debug(...)
#endif
#define error(...) \
do { \
fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, __func__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
} while (0)
static const int PIPE_READ = 0;
static const int PIPE_WRITE = 1;
/* We are choosing an exit code which is not reserved see:
* https://www.tldp.org/LDP/abs/html/exitcodes.html. */
static const int FORK_EXEC_FAILURE = 125;
static int set_flag(int fd, int flags) {
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
}
static int send_io_fds(int socket, int stdin_fd, int stdout_fd, int stderr_fd) {
struct msghdr msg = {0};
struct cmsghdr *cmsg;
int fds[3];
char buf[CMSG_SPACE(3 * sizeof(int))];
char dup[256];
struct iovec io;
memset(buf, '\0', sizeof(buf));
io.iov_base = &dup;
io.iov_len = sizeof(dup);
msg.msg_iov = &io;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(3 * sizeof(int));
fds[0] = stdin_fd;
fds[1] = stdout_fd;
fds[2] = stderr_fd;
debug("stdout: %d, stderr: %d", stdout_fd, stderr_fd);
memcpy((int *)CMSG_DATA(cmsg), fds, 3 * sizeof(int));
if (sendmsg(socket, &msg, 0) < 0) {
debug("Failed to send message");
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
static void close_pipes(int pipes[3][2]) {
for (int i = 0; i < 3; i++) {
if (pipes[i][PIPE_READ] > 0)
close(pipes[i][PIPE_READ]);
if (pipes[i][PIPE_WRITE] > 0)
close(pipes[i][PIPE_WRITE]);
}
}
static int exec_process(char const *bin, char *const *args, int socket,
- bool use_stderr) {
+ bool enable_stderr) {
int pipes[3][2] = {{0, 0}, {0, 0}, {0, 0}};
int r_cmdin, w_cmdin, r_cmdout, w_cmdout, r_cmderr, w_cmderr;
int i;
if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1 ||
pipe(pipes[STDERR_FILENO]) == -1) {
perror("[spawner] failed to create pipes");
close_pipes(pipes);
return 1;
}
debug("created pipes");
r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
r_cmderr = pipes[STDERR_FILENO][PIPE_READ];
w_cmderr = pipes[STDERR_FILENO][PIPE_WRITE];
if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
set_flag(w_cmderr, O_CLOEXEC) < 0 ||
set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmderr, O_CLOEXEC | O_NONBLOCK) < 0) {
perror("[spawner] failed to set flags for pipes");
close_pipes(pipes);
return 1;
}
debug("set fd flags to pipes");
if (send_io_fds(socket, w_cmdin, r_cmdout, r_cmderr) != EXIT_SUCCESS) {
perror("[spawner] failed to send fd via socket");
close_pipes(pipes);
return 1;
}
debug("sent fds over UDS");
close(STDIN_FILENO);
close(w_cmdin);
if (dup2(r_cmdin, STDIN_FILENO) < 0) {
perror("[spawner] failed to dup to stdin");
_exit(FORK_EXEC_FAILURE);
}
close(STDOUT_FILENO);
close(r_cmdout);
if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
perror("[spawner] failed to dup to stdout");
_exit(FORK_EXEC_FAILURE);
}
- if (use_stderr) {
+ if (enable_stderr) {
close(STDERR_FILENO);
close(r_cmderr);
if (dup2(w_cmderr, STDERR_FILENO) < 0) {
perror("[spawner] failed to dup to stderr");
_exit(FORK_EXEC_FAILURE);
}
} else {
close(r_cmderr);
close(w_cmderr);
}
/* Close all non-standard io fds. Not closing STDERR */
for (i = STDERR_FILENO + 1; i < sysconf(_SC_OPEN_MAX); i++)
close(i);
debug("exec %s", bin);
execvp(bin, args);
perror("[spawner] execvp(): failed");
_exit(FORK_EXEC_FAILURE);
}
-static int spawn(const char *socket_path, const char *use_stderr_str,
+static int spawn(const char *socket_path, const char *enable_stderr_str,
const char *bin, char *const *args) {
int socket_fd;
struct sockaddr_un socket_addr;
- bool use_stderr;
+ bool enable_stderr;
- if (strcmp(use_stderr_str, "true") == 0) {
- use_stderr = true;
+ if (strcmp(enable_stderr_str, "true") == 0) {
+ enable_stderr = true;
} else {
- use_stderr = false;
+ enable_stderr = false;
}
socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (socket_fd == -1) {
debug("Failed to create socket");
return EXIT_FAILURE;
}
debug("created domain socket");
memset(&socket_addr, 0, sizeof(struct sockaddr_un));
socket_addr.sun_family = AF_UNIX;
strncpy(socket_addr.sun_path, socket_path, sizeof(socket_addr.sun_path) - 1);
if (connect(socket_fd, (struct sockaddr *)&socket_addr,
sizeof(struct sockaddr_un)) == -1) {
debug("Failed to connect to socket");
return EXIT_FAILURE;
}
debug("connected to exile");
- if (exec_process(bin, args, socket_fd, use_stderr) != 0)
+ if (exec_process(bin, args, socket_fd, enable_stderr) != 0)
return EXIT_FAILURE;
// we should never reach here
return EXIT_SUCCESS;
}
int main(int argc, const char *argv[]) {
int status, i;
const char **exec_argv;
if (argc < 4) {
debug("expected at least 3 arguments, passed %d", argc);
status = EXIT_FAILURE;
} else {
exec_argv = malloc((argc - 3 + 1) * sizeof(char *));
for (i = 3; i < argc; i++)
exec_argv[i - 3] = argv[i];
exec_argv[i - 3] = NULL;
- debug("socket path: %s use_stderr: %s bin: %s", argv[1], argv[2], argv[3]);
+ debug("socket path: %s enable_stderr: %s bin: %s", argv[1], argv[2], argv[3]);
status = spawn(argv[1], argv[2], argv[3], (char *const *)exec_argv);
}
exit(status);
}
diff --git a/lib/exile.ex b/lib/exile.ex
index e219435..55191af 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,105 +1,140 @@
defmodule Exile do
@moduledoc """
- Exile is an alternative for beam ports with back-pressure and non-blocking IO
+ Exile is an alternative for beam [ports](https://hexdocs.pm/elixir/Port.html)
+ with back-pressure and non-blocking IO.
+
+ ## Comparison with Port
+
+ * it is demand driven. User explicitly has to `read` the command
+ output, and the progress of the external command is controlled
+ using OS pipes. Exile never load more output than we can consume,
+ so we should never experience memory issues
+
+ * it can close stdin while consuming output
+
+ * tries to handle zombie process by attempting to cleanup
+ external process. Note that there is no middleware involved
+ with exile so it is still possible to endup with zombie process.
+
+ * selectively consume stdout and stderr
+
+ Internally Exile uses non-blocking asynchronous system calls
+ to interact with the external process. It does not use port's
+ message based communication, instead uses raw stdio and NIF.
+ Uses asynchronous system calls for IO. Most of the system
+ calls are non-blocking, so it should not block the beam
+ schedulers. Make use of dirty-schedulers for IO
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
- # we use DynamicSupervisor for cleaning up external processes on
+ # We use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
@doc """
- Runs the given command with arguments and return an Enumerable to read command output.
+ Runs the command with arguments and return an Enumerable to read the output.
- First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`.
+ First parameter must be a list containing command with arguments.
+ example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* Enumerable:
```
# List
Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
# Stream
Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65_536)) |> Enum.to_list()
```
* Collectable:
- If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
+ If the input in a function with arity 1, Exile will call that function
+ with a `Collectable` as the argument. The function must *push* input to this
+ collectable. Return value of the function is ignored.
```
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
- By defaults no input will be given to the command
+ By defaults no input is sent to the command
- * `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
+ * `exit_timeout` - Duration to wait for external program to exit after completion
+ before raising an error. Defaults to `:infinity`
- * `max_chunk_size` - Maximum size of each iodata chunk emitted by stream. Chunk size will be variable depending on the amount of data available at that time. Defaults to 65_535
+ * `max_chunk_size` - Maximum size of iodata chunk emitted by the stream.
+ Chunk size can be less than the `max_chunk_size` depending on the amount of
+ data available to be read. Defaults to `65_535`
- * `use_stderr` - When set to true, stream will contain stderr output along with stdout output. Element of the stream will be of the form `{:stdout, iodata}` or `{:stderr, iodata}` to differentiate different streams. Defaults to false. See example below
+ * `enable_stderr` - When set to true, output stream will contain stderr data along
+ with stdout. Stream data will be of the form `{:stdout, iodata}` or `{:stderr, iodata}`
+ to differentiate different streams. Defaults to false. See example below
+ * `ignore_epipe` - when set to true, `EPIPE` error during the write will be ignored.
+ This can be used to match UNIX shell default behaviour. EPIPE is the error raised
+ when the reader finishes the reading and close output pipe before command completes.
+ Defaults to `false`.
- All other options are passed to `Exile.Process.start_link/2`
+ Remaining options are passed to `Exile.Process.start_link/2`
### Examples
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
Stream with stderr
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1),
input: File.stream!("music_video.mkv", [], 65_535),
- use_stderr: true
+ enable_stderr: true
)
|> Stream.transform(
fn ->
File.open!("music.mp3", [:write, :binary])
end,
fn elem, file ->
case elem do
{:stdout, data} ->
:ok = IO.binwrite(file, data)
{:stderr, msg} ->
:ok = IO.write(msg)
end
{[], file}
end,
fn file ->
:ok = File.close(file)
end
)
|> Stream.run()
```
"""
@type collectable_func() :: (Collectable.t() -> any())
@spec stream!(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
max_chunk_size: pos_integer()
) :: Exile.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index 1935b3d..36e4163 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,674 +1,564 @@
defmodule Exile.Process do
@moduledoc """
GenServer which wraps spawned external command.
- `Exile.stream!/1` should be preferred over using this. Use this only if you need more control over the life-cycle of IO streams and OS process.
+ `Exile.stream!/1` should be preferred over using this. Use this only
+ if you need more control over the life-cycle of IO streams and OS
+ process.
## Comparison with Port
- * it is demand driven. User explicitly has to `read` the command output, and the progress of the external command is controlled using OS pipes. Exile never load more output than we can consume, so we should never experience memory issues
+ * it is demand driven. User explicitly has to `read` the command
+ output, and the progress of the external command is controlled
+ using OS pipes. Exile never load more output than we can consume,
+ so we should never experience memory issues
+
* it can close stdin while consuming output
- * tries to handle zombie process by attempting to cleanup external process. Note that there is no middleware involved with exile so it is still possible to endup with zombie process.
- * selectively consume stdout and stderr streams
- Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication, instead uses raw stdio and NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Make use of dirty-schedulers for IO
+ * tries to handle zombie process by attempting to cleanup
+ external process. Note that there is no middleware involved
+ with exile so it is still possible to endup with zombie process.
+
+ * selectively consume stdout and stderr
+
+ Internally Exile uses non-blocking asynchronous system calls
+ to interact with the external process. It does not use port's
+ message based communication, instead uses raw stdio and NIF.
+ Uses asynchronous system calls for IO. Most of the system
+ calls are non-blocking, so it should not block the beam
+ schedulers. Make use of dirty-schedulers for IO
"""
use GenServer
- alias __MODULE__
- alias Exile.ProcessNif, as: Nif
- require Logger
+ alias Exile.Process.Exec
+ alias Exile.Process.Nif
+ alias Exile.Process.Operations
+ alias Exile.Process.Pipe
+ alias Exile.Process.State
- defstruct [
- :args,
- :errno,
- :port,
- :socket_path,
- :stdin,
- :stdout,
- :stderr,
- :status,
- :use_stderr,
- :await,
- :read_stdout,
- :read_stderr,
- :read_any,
- :write_stdin
- ]
-
- defmodule Pending do
- @moduledoc false
- defstruct bin: [], size: 0, client_pid: nil
- end
+ require Logger
defmodule Error do
defexception [:message]
end
+ @type pipe_name :: :stdin | :stdout | :stderr
+
+ @type t :: %__MODULE__{
+ monitor_ref: reference(),
+ exit_ref: reference(),
+ pid: pid | nil,
+ owner: pid
+ }
+
+ defstruct [:monitor_ref, :exit_ref, :pid, :owner]
+
@type exit_status :: non_neg_integer
- @default_opts [env: [], use_stderr: false]
+ @default_opts [env: [], enable_stderr: false]
@default_buffer_size 65_535
+ @os_signal_timeout 1000
@doc """
- Starts `Exile.ProcessServer`
+ Starts `Exile.Process`
Starts external program using `cmd_with_args` with options `opts`
`cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
* `cd` - the directory to run the command in
* `env` - a list of tuples containing environment key-value. These can be accessed in the external program
- * `use_stderr` - when set to true, exile connects stderr stream for the consumption. Defaults to false. Note that when set to true stderr must be consumed to avoid external program from blocking
+ * `enable_stderr` - when set to true, Exile connects stderr pipe for the consumption. Defaults to false. Note that when set to true stderr must be consumed to avoid external program from blocking
"""
- @type process :: pid
@spec start_link(nonempty_list(String.t()),
cd: String.t(),
env: [{String.t(), String.t()}],
- use_stderr: boolean()
- ) :: {:ok, process} | {:error, any()}
+ enable_stderr: boolean()
+ ) :: {:ok, t} | {:error, any()}
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
- with {:ok, args} <- normalize_args(cmd_with_args, opts) do
- GenServer.start(__MODULE__, args)
+ case Exec.normalize_exec_args(cmd_with_args, opts) do
+ {:ok, args} ->
+ owner = self()
+ exit_ref = make_ref()
+ args = Map.merge(args, %{owner: owner, exit_ref: exit_ref})
+ {:ok, pid} = GenServer.start_link(__MODULE__, args)
+ ref = Process.monitor(pid)
+
+ process = %__MODULE__{
+ pid: pid,
+ monitor_ref: ref,
+ exit_ref: exit_ref,
+ owner: owner
+ }
+
+ {:ok, process}
+
+ error ->
+ error
end
end
@doc """
- Closes external program's input stream
+ Closes external program's standard input pipe (stdin)
"""
- @spec close_stdin(process) :: :ok | {:error, any()}
+ @spec close_stdin(t) :: :ok | {:error, any()}
def close_stdin(process) do
- GenServer.call(process, :close_stdin, :infinity)
+ GenServer.call(process.pid, {:close_pipe, :stdin}, :infinity)
+ end
+
+ @doc """
+ Closes external program's standard output pipe (stdout)
+ """
+ @spec close_stdout(t) :: :ok | {:error, any()}
+ def close_stdout(process) do
+ GenServer.call(process.pid, {:close_pipe, :stdout}, :infinity)
+ end
+
+ @doc """
+ Closes external program's standard error pipe (stderr)
+ """
+ @spec close_stderr(t) :: :ok | {:error, any()}
+ def close_stderr(process) do
+ GenServer.call(process.pid, {:close_pipe, :stderr}, :infinity)
end
@doc """
- Writes iodata `data` to program's input streams
+ Writes iodata `data` to program's standard input pipe
This blocks when the pipe is full
"""
- @spec write(process, binary) :: :ok | {:error, any()}
+ @spec write(t, binary) :: :ok | {:error, any()}
def write(process, iodata) do
- GenServer.call(process, {:write_stdin, IO.iodata_to_binary(iodata)}, :infinity)
+ binary = IO.iodata_to_binary(iodata)
+ GenServer.call(process.pid, {:write_stdin, binary}, :infinity)
end
@doc """
- Returns bytes from executed command's stdout stream with maximum size `max_size`.
+ Returns bytes from executed command's stdout with maximum size `max_size`.
- Blocks if no bytes are written to stdout stream yet. And returns as soon as bytes are available
+ Blocks if no bytes are written to stdout yet. And returns as soon as bytes are available
"""
- @spec read(process, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
+ @spec read(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
def read(process, max_size \\ @default_buffer_size)
when is_integer(max_size) and max_size > 0 do
- GenServer.call(process, {:read_stdout, max_size}, :infinity)
+ GenServer.call(process.pid, {:read_stdout, max_size}, :infinity)
end
@doc """
- Returns bytes from executed command's stderr stream with maximum size `max_size`.
+ Returns bytes from executed command's stderr with maximum size `max_size`.
- Blocks if no bytes are written to stdout stream yet. And returns as soon as bytes are available
+ Blocks if no bytes are written to stderr yet. And returns as soon as bytes are available
"""
- @spec read_stderr(process, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
+ @spec read_stderr(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
def read_stderr(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
- GenServer.call(process, {:read_stderr, size}, :infinity)
+ GenServer.call(process.pid, {:read_stderr, size}, :infinity)
end
@doc """
- Returns bytes from either stdout or stderr stream with maximum size `max_size` whichever is available.
+ Returns bytes from either stdout or stderr with maximum size `max_size` whichever is available.
- Blocks if no bytes are written to stdout/stderr stream yet. And returns as soon as bytes are available
+ Blocks if no bytes are written to stdout/stderr yet. And returns as soon as bytes are available
"""
- @spec read_any(process, pos_integer()) ::
+ @spec read_any(t, pos_integer()) ::
{:ok, {:stdout, iodata}} | {:ok, {:stderr, iodata}} | :eof | {:error, any()}
def read_any(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
- GenServer.call(process, {:read_any, size}, :infinity)
+ GenServer.call(process.pid, {:read_stdout_or_stderr, size}, :infinity)
+ end
+
+ @spec change_pipe_owner(t, pipe_name, pid) :: :ok | {:error, any()}
+ def change_pipe_owner(process, pipe_name, target_owner_pid) do
+ GenServer.call(
+ process.pid,
+ {:change_pipe_owner, pipe_name, target_owner_pid},
+ :infinity
+ )
end
@doc """
Sends signal to external program
"""
- @spec kill(process, :sigkill | :sigterm) :: :ok
+ @spec kill(t, :sigkill | :sigterm) :: :ok
def kill(process, signal) when signal in [:sigkill, :sigterm] do
- GenServer.call(process, {:kill, signal}, :infinity)
+ GenServer.call(process.pid, {:kill, signal}, :infinity)
end
@doc """
Waits for the program to terminate.
- If the program terminates before timeout, it returns `{:ok, {:exit, exit_status}}` else returns `:timeout`
+ If the program terminates before timeout, it returns `{:ok, exit_status}`
"""
- @spec await_exit(process, timeout :: timeout()) :: {:ok, {:exit, exit_status}} | :timeout
- def await_exit(process, timeout \\ :infinity) do
- GenServer.call(process, {:await_exit, timeout}, :infinity)
+ @spec await_exit(t, timeout :: timeout()) :: {:ok, exit_status}
+ def await_exit(process, timeout \\ 5000) do
+ %__MODULE__{
+ monitor_ref: monitor_ref,
+ exit_ref: exit_ref,
+ owner: owner,
+ pid: pid
+ } = process
+
+ if self() != owner do
+ raise ArgumentError,
+ "task #{inspect(process)} exit status can only be queried by owner but was queried from #{inspect(self())}"
+ end
+
+ graceful_exit_timeout =
+ if timeout == :infinity do
+ :infinity
+ else
+ # process exit steps should finish before receive timeout exceeds
+ # receive timeout is max allowed time for the `await_exit` call to block
+ max(0, timeout - 100)
+ end
+
+ :ok = GenServer.cast(pid, {:prepare_exit, owner, graceful_exit_timeout})
+
+ receive do
+ {^exit_ref, exit_status} ->
+ exit_status
+
+ {:DOWN, ^monitor_ref, _, _proc, reason} ->
+ exit({reason, {__MODULE__, :await_exit, [process, timeout]}})
+ after
+ # ideally we should never this this case since the process must
+ # be terminated before the timeout and we should have received
+ # `DOWN` message
+ timeout ->
+ exit({:timeout, {__MODULE__, :await_exit, [process, timeout]}})
+ end
end
@doc """
Returns OS pid of the command
"""
- @spec os_pid(process) :: pos_integer()
+ @spec os_pid(t) :: pos_integer()
def os_pid(process) do
- GenServer.call(process, :os_pid, :infinity)
+ GenServer.call(process.pid, :os_pid, :infinity)
end
- @doc """
- Stops the exile process, external program will be terminated in the background
- """
- @spec stop(process) :: :ok
- def stop(process), do: GenServer.call(process, :stop, :infinity)
-
## Server
+ @impl true
def init(args) do
- {use_stderr, args} = Map.pop(args, :use_stderr)
+ {enable_stderr, args} = Map.pop(args, :enable_stderr)
+ {owner, args} = Map.pop!(args, :owner)
+ {exit_ref, args} = Map.pop!(args, :exit_ref)
- state = %__MODULE__{
+ state = %State{
args: args,
- errno: nil,
+ owner: owner,
status: :init,
- await: %{},
- use_stderr: use_stderr,
- read_stdout: %Pending{},
- read_stderr: %Pending{},
- read_any: %Pending{},
- write_stdin: %Pending{}
+ enable_stderr: enable_stderr,
+ operations: Operations.new(),
+ exit_ref: exit_ref,
+ monitor_ref: Process.monitor(owner)
}
{:ok, state, {:continue, nil}}
end
+ @impl true
def handle_continue(nil, state) do
- Elixir.Process.flag(:trap_exit, true)
- {:noreply, start_process(state)}
+ {:noreply, exec(state)}
end
- def handle_call(:stop, _from, state) do
- # TODO: pending write and read should receive "stopped" return
- # value instead of exit signal
- {:stop, :normal, :ok, state}
- end
+ @impl true
+ def handle_cast({:prepare_exit, caller, timeout}, state) do
+ state =
+ Enum.reduce(state.pipes, state, fn {_pipe_name, pipe}, state ->
+ case Pipe.close(pipe, caller) do
+ {:ok, pipe} ->
+ {:ok, state} = State.put_pipe(state, pipe.name, pipe)
+ state
- def handle_call(:close_stdin, _from, state) do
- case state.status do
- {:exit, _} -> {:reply, :ok, state}
- _ -> do_close(state, :stdin)
+ {:error, _} ->
+ state
+ end
+ end)
+
+ case maybe_shutdown(state) do
+ {:stop, :normal, state} ->
+ {:stop, :normal, state}
+
+ {:noreply, state} ->
+ if timeout == :infinity do
+ {:noreply, state}
+ else
+ total_stages = 3
+ stage_timeout = div(timeout, total_stages)
+ handle_info({:prepare_exit, :normal_exit, stage_timeout}, state)
+ end
end
end
- def handle_call({:await_exit, _}, _from, %{status: {:exit, status}} = state) do
- {:reply, {:ok, {:exit, status}}, state}
+ @impl true
+ def handle_call({:change_pipe_owner, pipe_name, new_owner}, _from, state) do
+ with {:ok, pipe} <- State.pipe(state, pipe_name),
+ {:ok, new_pipe} <- Pipe.set_owner(pipe, new_owner),
+ {:ok, state} <- State.put_pipe(state, pipe_name, new_pipe) do
+ {:reply, :ok, state}
+ else
+ {:error, _} = error ->
+ {:reply, error, state}
+ end
end
- def handle_call({:await_exit, timeout}, from, %{status: :start} = state) do
- tref =
- if timeout != :infinity do
- Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
- else
- nil
- end
-
- {:noreply, %Process{state | await: Map.put(state.await, from, tref)}}
+ def handle_call({:close_pipe, pipe_name}, {caller, _} = from, state) do
+ with {:ok, pipe} <- State.pipe(state, pipe_name),
+ {:ok, new_pipe} <- Pipe.close(pipe, caller),
+ :ok <- GenServer.reply(from, :ok),
+ {:ok, new_state} <- State.put_pipe(state, pipe_name, new_pipe) do
+ maybe_shutdown(new_state)
+ else
+ {:error, _} = ret ->
+ {:reply, ret, state}
+ end
end
def handle_call({:read_stdout, size}, from, state) do
- case can_read?(state, :stdout) do
- :ok ->
- pending = %Pending{size: size, client_pid: from}
- do_read_stdout(%Process{state | read_stdout: pending})
-
- error ->
- GenServer.reply(from, error)
+ case Operations.read(state, {:read_stdout, from, size}) do
+ {:noreply, state} ->
{:noreply, state}
+
+ ret ->
+ {:reply, ret, state}
end
end
def handle_call({:read_stderr, size}, from, state) do
- case can_read?(state, :stderr) do
- :ok ->
- pending = %Pending{size: size, client_pid: from}
- do_read_stderr(%Process{state | read_stderr: pending})
-
- error ->
- GenServer.reply(from, error)
+ case Operations.read(state, {:read_stderr, from, size}) do
+ {:noreply, state} ->
{:noreply, state}
+
+ ret ->
+ {:reply, ret, state}
end
end
- def handle_call({:read_any, size}, from, state) do
- case can_read?(state, :any) do
- :ok ->
- pending = %Pending{size: size, client_pid: from}
- do_read_any(%Process{state | read_any: pending})
-
- error ->
- GenServer.reply(from, error)
+ def handle_call({:read_stdout_or_stderr, size}, from, state) do
+ case Operations.read_any(state, {:read_stdout_or_stderr, from, size}) do
+ {:noreply, state} ->
{:noreply, state}
- end
- end
- def handle_call(_, _from, %{status: {:exit, status}} = state) do
- {:reply, {:error, {:exit, status}}, state}
+ ret ->
+ {:reply, ret, state}
+ end
end
def handle_call({:write_stdin, binary}, from, state) do
- cond do
- !is_binary(binary) ->
- {:reply, {:error, :not_binary}, state}
-
- state.write_stdin.client_pid ->
- {:reply, {:error, :write_stdin}, state}
+ case Operations.write(state, {:write_stdin, from, binary}) do
+ {:noreply, state} ->
+ {:noreply, state}
- true ->
- pending = %Pending{bin: binary, client_pid: from}
- do_write(%Process{state | write_stdin: pending})
+ ret ->
+ {:reply, ret, state}
end
end
def handle_call(:os_pid, _from, state) do
case Port.info(state.port, :os_pid) do
{:os_pid, os_pid} ->
{:reply, {:ok, os_pid}, state}
nil ->
Logger.debug("Process not alive")
{:reply, :undefined, state}
end
end
def handle_call({:kill, signal}, _from, state) do
{:reply, signal(state.port, signal), state}
end
- def handle_info({:await_exit_timeout, from}, state) do
- GenServer.reply(from, :timeout)
- {:noreply, %Process{state | await: Map.delete(state.await, from)}}
- end
-
- def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
-
- def handle_info({:select, read_resource, _ref, :ready_input}, state) do
+ @impl true
+ def handle_info({:prepare_exit, current_stage, timeout}, %{status: status, port: port} = state) do
cond do
- state.read_any.client_pid ->
- stream =
- cond do
- read_resource == state.stdout -> :stdout
- read_resource == state.stderr -> :stderr
- end
-
- do_read_any(state, stream)
-
- state.read_stdout.client_pid && read_resource == state.stdout ->
- do_read_stdout(state)
-
- state.read_stderr.client_pid && read_resource == state.stderr ->
- do_read_stderr(state)
-
- true ->
+ status != :running ->
{:noreply, state}
- end
- end
-
- def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state),
- do: handle_port_exit(exit_status, state)
-
- def handle_info({:EXIT, port, :normal}, %{port: port} = state), do: {:noreply, state}
-
- def handle_info({:EXIT, _, reason}, state), do: {:stop, reason, state}
-
- defp handle_port_exit(exit_status, state) do
- Enum.each(state.await, fn {from, tref} ->
- GenServer.reply(from, {:ok, {:exit, exit_status}})
- if tref do
- Elixir.Process.cancel_timer(tref)
- end
- end)
-
- {:noreply, %Process{state | status: {:exit, exit_status}, await: %{}}}
- end
-
- defmacrop eof, do: {:ok, <<>>}
- defmacrop eagain, do: {:error, :eagain}
-
- defp do_write(%Process{write_stdin: %Pending{bin: <<>>}} = state) do
- reply_action(state, :write_stdin, :ok)
- end
-
- defp do_write(%Process{write_stdin: pending} = state) do
- bin_size = byte_size(pending.bin)
-
- case Nif.nif_write(state.stdin, pending.bin) do
- {:ok, size} when size < bin_size ->
- binary = binary_part(pending.bin, size, bin_size - size)
- noreply_action(%{state | write_stdin: %Pending{pending | bin: binary}})
-
- {:ok, _size} ->
- reply_action(state, :write_stdin, :ok)
-
- eagain() ->
- noreply_action(state)
-
- {:error, errno} ->
- reply_action(%Process{state | errno: errno}, :write_stdin, {:error, errno})
- end
- end
-
- defp do_read_stdout(%Process{read_stdout: pending} = state) do
- case Nif.nif_read(state.stdout, pending.size) do
- eof() ->
- reply_action(state, :read_stdout, :eof)
+ current_stage == :normal_exit ->
+ Elixir.Process.send_after(self(), {:prepare_exit, :sigterm, timeout}, timeout)
+ {:noreply, state}
- {:ok, binary} ->
- reply_action(state, :read_stdout, {:ok, binary})
+ current_stage == :sigterm ->
+ signal(port, :sigterm)
+ Elixir.Process.send_after(self(), {:prepare_exit, :sigkill, timeout}, timeout)
+ {:noreply, state}
- eagain() ->
- noreply_action(state)
+ current_stage == :sigkill ->
+ signal(port, :sigkill)
+ Elixir.Process.send_after(self(), {:prepare_exit, :stop, timeout}, timeout)
+ {:noreply, state}
- {:error, errno} ->
- reply_action(%Process{state | errno: errno}, :read_stdout, {:error, errno})
+ # this should never happen, since sigkill signal can not be ignored by the OS process
+ current_stage == :stop ->
+ {:stop, :sigkill_timeout, state}
end
end
- defp do_read_stderr(%Process{read_stderr: pending} = state) do
- case Nif.nif_read(state.stderr, pending.size) do
- eof() ->
- reply_action(state, :read_stderr, :eof)
+ def handle_info({:select, write_resource, _ref, :ready_output}, state) do
+ :stdin = State.pipe_name_for_fd(state, write_resource)
- {:ok, binary} ->
- reply_action(state, :read_stderr, {:ok, binary})
+ with {:ok, {:write_stdin, from, _bin} = operation, state} <-
+ State.pop_operation(state, :write_stdin) do
+ case Operations.write(state, operation) do
+ {:noreply, state} ->
+ {:noreply, state}
- eagain() ->
- noreply_action(state)
-
- {:error, errno} ->
- reply_action(%Process{state | errno: errno}, :read_stderr, {:error, errno})
- end
- end
-
- defp do_read_any(state, stream_hint \\ :stdout) do
- %Process{read_any: pending, use_stderr: use_stderr} = state
-
- other_stream =
- case stream_hint do
- :stdout -> :stderr
- :stderr -> :stdout
+ ret ->
+ GenServer.reply(from, ret)
+ {:noreply, state}
end
-
- case Nif.nif_read(stream_fd(state, stream_hint), pending.size) do
- ret when ret in [eof(), eagain()] and use_stderr == true ->
- case {ret, Nif.nif_read(stream_fd(state, other_stream), pending.size)} do
- {eof(), eof()} ->
- reply_action(state, :read_any, :eof)
-
- {_, {:ok, binary}} ->
- reply_action(state, :read_any, {:ok, {other_stream, binary}})
-
- {_, eagain()} ->
- noreply_action(state)
-
- {_, {:error, errno}} ->
- reply_action(%Process{state | errno: errno}, :read_any, {:error, errno})
- end
-
- eof() ->
- reply_action(state, :read_any, :eof)
-
- {:ok, binary} ->
- reply_action(state, :read_any, {:ok, {stream_hint, binary}})
-
- eagain() ->
- noreply_action(state)
-
- {:error, errno} ->
- reply_action(%Process{state | errno: errno}, :read_any, {:error, errno})
end
end
- defp do_close(state, stream) do
- ret = Nif.nif_close(stream_fd(state, stream))
- {:reply, ret, state}
- end
-
- defp stream_fd(state, stream) do
- case stream do
- :stdin -> state.stdin
- :stdout -> state.stdout
- :stderr -> state.stderr
- end
- end
-
- defp can_read?(state, :stdout) do
- cond do
- state.read_stdout.client_pid ->
- {:error, :pending_stdout_read}
-
- true ->
- :ok
- end
- end
+ def handle_info({:select, read_resource, _ref, :ready_input}, state) do
+ pipe_name = State.pipe_name_for_fd(state, read_resource)
- defp can_read?(state, :stderr) do
- cond do
- !state.use_stderr ->
- {:error, :cannot_read_stderr}
+ with {:ok, operation_name} <- Operations.match_pending_operation(state, pipe_name),
+ {:ok, {_, from, _} = operation, state} <- State.pop_operation(state, operation_name) do
+ ret =
+ case operation_name do
+ :read_stdout_or_stderr ->
+ Operations.read_any(state, operation)
- state.read_stderr.client_pid ->
- {:error, :pending_stderr_read}
+ name when name in [:read_stdout, :read_stderr] ->
+ Operations.read(state, operation)
+ end
- true ->
- :ok
- end
- end
+ case ret do
+ {:noreply, state} ->
+ {:noreply, state}
- defp can_read?(state, :any) do
- with :ok <- can_read?(state, :stdout) do
- if state.use_stderr do
- can_read?(state, :stderr)
- else
- :ok
+ ret ->
+ GenServer.reply(from, ret)
+ {:noreply, state}
end
+ else
+ {:error, _error} ->
+ {:noreply, state}
end
end
- defp signal(port, sig) when sig in [:sigkill, :sigterm] do
- case Port.info(port, :os_pid) do
- {:os_pid, os_pid} ->
- Nif.nif_kill(os_pid, sig)
-
- nil ->
- {:error, :process_not_alive}
- end
- end
-
- @spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
-
- defp start_process(state) do
- %{args: %{cmd_with_args: cmd_with_args, cd: cd, env: env}, use_stderr: use_stderr} = state
-
- socket_path = socket_path()
- {:ok, sock} = :socket.open(:local, :stream, :default)
-
- try do
- :ok = socket_bind(sock, socket_path)
- :ok = :socket.listen(sock)
-
- spawner_cmdline_args = [socket_path, to_string(use_stderr) | cmd_with_args]
-
- port_opts =
- [:nouse_stdio, :exit_status, :binary, args: spawner_cmdline_args] ++
- prune_nils(env: env, cd: cd)
-
- port = Port.open({:spawn_executable, @spawner_path}, port_opts)
-
- {:os_pid, os_pid} = Port.info(port, :os_pid)
- Exile.Watcher.watch(self(), os_pid, socket_path)
-
- {stdin, stdout, stderr} = receive_fds(sock, state.use_stderr)
-
- %Process{
- state
- | port: port,
- status: :start,
- socket_path: socket_path,
- stdin: stdin,
- stdout: stdout,
- stderr: stderr
- }
- after
- :socket.close(sock)
+ def handle_info({:stop, :sigterm}, state) do
+ if state.status == :running do
+ signal(state.port, :sigkill)
+ Elixir.Process.send_after(self(), {:stop, :sigkill}, @os_signal_timeout)
end
- end
-
- @socket_timeout 2000
-
- defp receive_fds(lsock, use_stderr) do
- {:ok, sock} = :socket.accept(lsock, @socket_timeout)
-
- try do
- {:ok, msg} = :socket.recvmsg(sock, @socket_timeout)
- %{ctrl: [%{data: data, level: :socket, type: :rights}]} = msg
-
- <<stdin_fd::native-32, stdout_fd::native-32, stderr_fd::native-32, _::binary>> = data
-
- {:ok, stdout} = Nif.nif_create_fd(stdout_fd)
- {:ok, stdin} = Nif.nif_create_fd(stdin_fd)
-
- {:ok, stderr} =
- if use_stderr do
- Nif.nif_create_fd(stderr_fd)
- else
- {:ok, nil}
- end
- {stdin, stdout, stderr}
- after
- :socket.close(sock)
- end
+ {:noreply, state}
end
- defp socket_bind(sock, path) do
- case :socket.bind(sock, %{family: :local, path: path}) do
- :ok -> :ok
- # for OTP version <= 24 compatibility
- {:ok, _} -> :ok
- other -> other
+ def handle_info({:stop, :sigkill}, state) do
+ if state.status == :running do
+ # this should never happen, since sigkill signal can not be handled
+ {:stop, :sigkill_timeout, state}
+ else
+ {:noreply, state}
end
end
- defp socket_path do
- str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
- path = Path.join(System.tmp_dir!(), str)
- _ = :file.delete(path)
- path
+ def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state) do
+ send(state.owner, {state.exit_ref, {:ok, exit_status}})
+ state = State.set_status(state, {:exit, exit_status})
+ maybe_shutdown(state)
end
- defp prune_nils(kv) do
- Enum.reject(kv, fn {_, v} -> is_nil(v) end)
+ # we are only interested in Port exit signals
+ def handle_info({:EXIT, port, reason}, %State{port: port} = state) when reason != :normal do
+ send(state.owner, {state.exit_ref, {:error, reason}})
+ state = State.set_status(state, {:exit, {:error, reason}})
+ maybe_shutdown(state)
end
- defp reply_action(state, action, return_value) do
- pending = Map.fetch!(state, action)
-
- :ok = GenServer.reply(pending.client_pid, return_value)
- {:noreply, Map.put(state, action, %Pending{})}
+ def handle_info({:EXIT, port, :normal}, %{port: port} = state) do
+ maybe_shutdown(state)
end
- defp noreply_action(state) do
- {:noreply, state}
+ # shutdown unconditionally when process owner exit normally.
+ # Since Exile process is linked to the owner, in case of owner crash,
+ # exile process will be killed by the VM.
+ def handle_info(
+ {:DOWN, owner_ref, :process, _pid, reason},
+ %State{monitor_ref: owner_ref} = state
+ ) do
+ {:stop, reason, state}
end
- defp normalize_cmd(arg) do
- case arg do
- [cmd | _] when is_binary(cmd) ->
- path = System.find_executable(cmd)
+ def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
+ state =
+ Enum.reduce(state.pipes, state, fn {_pipe_name, pipe}, state ->
+ case Pipe.close(pipe, pid) do
+ {:ok, pipe} ->
+ {:ok, state} = State.put_pipe(state, pipe.name, pipe)
+ state
- if path do
- {:ok, to_charlist(path)}
- else
- {:error, "command not found: #{inspect(cmd)}"}
+ {:error, _} ->
+ state
end
+ end)
- _ ->
- {:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
- end
- end
-
- defp normalize_cmd_args([_ | args]) do
- if is_list(args) && Enum.all?(args, &is_binary/1) do
- {:ok, Enum.map(args, &to_charlist/1)}
- else
- {:error, "command arguments must be list of strings. #{inspect(args)}"}
- end
+ maybe_shutdown(state)
end
- defp normalize_cd(cd) do
- case cd do
- nil ->
- {:ok, ''}
-
- cd when is_binary(cd) ->
- if File.exists?(cd) && File.dir?(cd) do
- {:ok, to_charlist(cd)}
- else
- {:error, "`:cd` must be valid directory path"}
- end
+ @type signal :: :sigkill | :sigterm
- _ ->
- {:error, "`:cd` must be a binary string"}
- end
- end
+ @spec signal(port, signal) :: :ok | {:error, :invalid_signal} | {:error, :process_not_alive}
+ defp signal(port, signal) do
+ with true <- signal in [:sigkill, :sigterm],
+ {:os_pid, os_pid} <- Port.info(port, :os_pid) do
+ Nif.nif_kill(os_pid, signal)
+ else
+ false ->
+ {:error, :invalid_signal}
- defp normalize_env(env) do
- case env do
nil ->
- {:ok, []}
-
- env when is_list(env) or is_map(env) ->
- env =
- Enum.map(env, fn {key, value} ->
- {to_charlist(key), to_charlist(value)}
- end)
-
- {:ok, env}
-
- _ ->
- {:error, "`:env` must be a map or list of `{string, string}`"}
+ {:error, :process_not_alive}
end
end
- defp normalize_use_stderr(use_stderr) do
- case use_stderr do
- nil ->
- {:ok, false}
-
- use_stderr when is_boolean(use_stderr) ->
- {:ok, use_stderr}
+ @spec maybe_shutdown(State.t()) :: {:stop, :normal, State.t()} | {:noreply, State.t()}
+ defp maybe_shutdown(state) do
+ open_pipes_count =
+ state.pipes
+ |> Map.values()
+ |> Enum.count(&Pipe.open?/1)
- _ ->
- {:error, ":use_stderr must be a boolean"}
+ if open_pipes_count == 0 && !(state.status in [:init, :running]) do
+ {:stop, :normal, state}
+ else
+ {:noreply, state}
end
end
- defp validate_opts_fields(opts) do
- {_, additional_opts} = Keyword.split(opts, [:cd, :env, :use_stderr])
+ @spec exec(State.t()) :: State.t()
+ defp exec(state) do
+ %{
+ port: port,
+ stdin: stdin_fd,
+ stdout: stdout_fd,
+ stderr: stderr_fd
+ } = Exec.start(state.args, state.enable_stderr)
- if Enum.empty?(additional_opts) do
- :ok
- else
- {:error, "invalid opts: #{inspect(additional_opts)}"}
- end
- end
+ stderr =
+ if state.enable_stderr do
+ Pipe.new(:stderr, stderr_fd, state.owner)
+ else
+ Pipe.new(:stderr)
+ end
- defp normalize_args(cmd_with_args, opts) do
- with {:ok, cmd} <- normalize_cmd(cmd_with_args),
- {:ok, args} <- normalize_cmd_args(cmd_with_args),
- :ok <- validate_opts_fields(opts),
- {:ok, cd} <- normalize_cd(opts[:cd]),
- {:ok, use_stderr} <- normalize_use_stderr(opts[:use_stderr]),
- {:ok, env} <- normalize_env(opts[:env]) do
- {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, use_stderr: use_stderr}}
- end
+ %State{
+ state
+ | port: port,
+ status: :running,
+ pipes: %{
+ stdin: Pipe.new(:stdin, stdin_fd, state.owner),
+ stdout: Pipe.new(:stdout, stdout_fd, state.owner),
+ stderr: stderr
+ }
+ }
end
end
diff --git a/lib/exile/process/exec.ex b/lib/exile/process/exec.ex
new file mode 100644
index 0000000..ea6e104
--- /dev/null
+++ b/lib/exile/process/exec.ex
@@ -0,0 +1,195 @@
+defmodule Exile.Process.Exec do
+ @moduledoc false
+
+ alias Exile.Process.Nif
+
+ @type args :: %{
+ cmd_with_args: [String.t()],
+ cd: String.t(),
+ env: [{String.t(), String.t()}]
+ }
+
+ @spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
+
+ @spec start(args, boolean()) :: %{
+ port: port,
+ stdin: non_neg_integer(),
+ stdout: non_neg_integer(),
+ stderr: non_neg_integer()
+ }
+ def start(
+ %{
+ cmd_with_args: cmd_with_args,
+ cd: cd,
+ env: env
+ },
+ enable_stderr
+ ) do
+ socket_path = socket_path()
+ {:ok, sock} = :socket.open(:local, :stream, :default)
+
+ try do
+ :ok = socket_bind(sock, socket_path)
+ :ok = :socket.listen(sock)
+
+ spawner_cmdline_args = [socket_path, to_string(enable_stderr) | cmd_with_args]
+
+ port_opts =
+ [:nouse_stdio, :exit_status, :binary, args: spawner_cmdline_args] ++
+ prune_nils(env: env, cd: cd)
+
+ port = Port.open({:spawn_executable, @spawner_path}, port_opts)
+
+ {:os_pid, os_pid} = Port.info(port, :os_pid)
+ Exile.Watcher.watch(self(), os_pid, socket_path)
+
+ {stdin_fd, stdout_fd, stderr_fd} = receive_fds(sock, enable_stderr)
+
+ %{port: port, stdin: stdin_fd, stdout: stdout_fd, stderr: stderr_fd}
+ after
+ :socket.close(sock)
+ File.rm!(socket_path)
+ end
+ end
+
+ def normalize_exec_args(cmd_with_args, opts) do
+ with {:ok, cmd} <- normalize_cmd(cmd_with_args),
+ {:ok, args} <- normalize_cmd_args(cmd_with_args),
+ :ok <- validate_opts_fields(opts),
+ {:ok, cd} <- normalize_cd(opts[:cd]),
+ {:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
+ {:ok, env} <- normalize_env(opts[:env]) do
+ {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, enable_stderr: enable_stderr}}
+ end
+ end
+
+ @socket_timeout 2000
+
+ defp receive_fds(lsock, enable_stderr) do
+ {:ok, sock} = :socket.accept(lsock, @socket_timeout)
+
+ try do
+ {:ok, msg} = :socket.recvmsg(sock, @socket_timeout)
+ %{ctrl: [%{data: data, level: :socket, type: :rights}]} = msg
+
+ <<stdin_fd::native-32, stdout_fd::native-32, stderr_fd::native-32, _::binary>> = data
+
+ # FDs are managed by the NIF resource life-cycle
+ {:ok, stdout} = Nif.nif_create_fd(stdout_fd)
+ {:ok, stdin} = Nif.nif_create_fd(stdin_fd)
+
+ {:ok, stderr} =
+ if enable_stderr do
+ Nif.nif_create_fd(stderr_fd)
+ else
+ {:ok, nil}
+ end
+
+ {stdin, stdout, stderr}
+ after
+ :socket.close(sock)
+ end
+ end
+
+ defp socket_bind(sock, path) do
+ case :socket.bind(sock, %{family: :local, path: path}) do
+ :ok -> :ok
+ # for compatibility with OTP version < 24
+ {:ok, _} -> :ok
+ other -> other
+ end
+ end
+
+ defp socket_path do
+ str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
+ path = Path.join(System.tmp_dir!(), str)
+ _ = :file.delete(path)
+ path
+ end
+
+ defp prune_nils(kv) do
+ Enum.reject(kv, fn {_, v} -> is_nil(v) end)
+ end
+
+ defp normalize_cmd(arg) do
+ case arg do
+ [cmd | _] when is_binary(cmd) ->
+ path = System.find_executable(cmd)
+
+ if path do
+ {:ok, to_charlist(path)}
+ else
+ {:error, "command not found: #{inspect(cmd)}"}
+ end
+
+ _ ->
+ {:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
+ end
+ end
+
+ defp normalize_cmd_args([_ | args]) do
+ if is_list(args) && Enum.all?(args, &is_binary/1) do
+ {:ok, Enum.map(args, &to_charlist/1)}
+ else
+ {:error, "command arguments must be list of strings. #{inspect(args)}"}
+ end
+ end
+
+ defp normalize_cd(cd) do
+ case cd do
+ nil ->
+ {:ok, ''}
+
+ cd when is_binary(cd) ->
+ if File.exists?(cd) && File.dir?(cd) do
+ {:ok, to_charlist(cd)}
+ else
+ {:error, "`:cd` must be valid directory path"}
+ end
+
+ _ ->
+ {:error, "`:cd` must be a binary string"}
+ end
+ end
+
+ defp normalize_env(env) do
+ case env do
+ nil ->
+ {:ok, []}
+
+ env when is_list(env) or is_map(env) ->
+ env =
+ Enum.map(env, fn {key, value} ->
+ {to_charlist(key), to_charlist(value)}
+ end)
+
+ {:ok, env}
+
+ _ ->
+ {:error, "`:env` must be a map or list of `{string, string}`"}
+ end
+ end
+
+ defp normalize_enable_stderr(enable_stderr) do
+ case enable_stderr do
+ nil ->
+ {:ok, false}
+
+ enable_stderr when is_boolean(enable_stderr) ->
+ {:ok, enable_stderr}
+
+ _ ->
+ {:error, ":enable_stderr must be a boolean"}
+ end
+ end
+
+ defp validate_opts_fields(opts) do
+ {_, additional_opts} = Keyword.split(opts, [:cd, :env, :enable_stderr])
+
+ if Enum.empty?(additional_opts) do
+ :ok
+ else
+ {:error, "invalid opts: #{inspect(additional_opts)}"}
+ end
+ end
+end
diff --git a/lib/exile/process_nif.ex b/lib/exile/process/nif.ex
similarity index 95%
rename from lib/exile/process_nif.ex
rename to lib/exile/process/nif.ex
index 2da0f15..3fede38 100644
--- a/lib/exile/process_nif.ex
+++ b/lib/exile/process/nif.ex
@@ -1,21 +1,21 @@
-defmodule Exile.ProcessNif do
+defmodule Exile.Process.Nif do
@moduledoc false
@on_load :load_nifs
def load_nifs do
nif_path = :filename.join(:code.priv_dir(:exile), "exile")
:erlang.load_nif(nif_path, 0)
end
def nif_is_os_pid_alive(_os_pid), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_kill(_os_pid, _signal), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_read(_fd, _max_size), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_create_fd(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_close(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_write(_fd, _bin), do: :erlang.nif_error(:nif_library_not_loaded)
end
diff --git a/lib/exile/process/operations.ex b/lib/exile/process/operations.ex
new file mode 100644
index 0000000..7f0391d
--- /dev/null
+++ b/lib/exile/process/operations.ex
@@ -0,0 +1,246 @@
+defmodule Exile.Process.Operations do
+ @moduledoc false
+
+ alias Exile.Process.Pipe
+ alias Exile.Process.State
+
+ @type t :: %__MODULE__{
+ write_stdin: write_operation() | nil,
+ read_stdout: read_operation() | nil,
+ read_stderr: read_operation() | nil,
+ read_stdout_or_stderr: read_any_operation() | nil
+ }
+
+ defstruct [:write_stdin, :read_stdout, :read_stderr, :read_stdout_or_stderr]
+
+ @spec new :: t
+ def new, do: %__MODULE__{}
+
+ @type write_operation ::
+ {:write_stdin, GenServer.from(), binary}
+
+ @type read_operation ::
+ {:read_stdout, GenServer.from(), non_neg_integer()}
+ | {:read_stderr, GenServer.from(), non_neg_integer()}
+
+ @type read_any_operation ::
+ {:read_stdout_or_stderr, GenServer.from(), non_neg_integer()}
+
+ @type operation :: write_operation() | read_operation() | read_any_operation()
+
+ @type name :: :write_stdin | :read_stdout | :read_stderr | :read_stdout_or_stderr
+
+ @spec get(t, name) :: {:ok, operation()} | {:error, term}
+ def get(operations, name) do
+ {:ok, Map.fetch!(operations, name)}
+ end
+
+ @spec pop(t, name) :: {:ok, operation, t} | {:error, term}
+ def pop(operations, name) do
+ case Map.get(operations, name) do
+ nil ->
+ {:error, :operation_not_found}
+
+ operation ->
+ {:ok, operation, Map.put(operations, name, nil)}
+ end
+ end
+
+ @spec put(t, operation()) :: {:ok, t} | {:error, term}
+ def put(operations, operation) do
+ with {:ok, {op_name, _from, _arg} = operation} <- validate_operation(operation) do
+ {:ok, Map.put(operations, op_name, operation)}
+ end
+ end
+
+ @spec read_any(State.t(), read_any_operation()) ::
+ :eof
+ | {:noreply, State.t()}
+ | {:ok, {:stdout | :stderr, binary}}
+ | {:error, term}
+ def read_any(state, {:read_stdout_or_stderr, _from, _size} = operation) do
+ with {:ok, {_name, {caller, _}, arg}} <- validate_read_any_operation(operation),
+ first <- pipe_name(operation),
+ {:ok, primary} <- State.pipe(state, first),
+ second <- if(first == :stdout, do: :stderr, else: :stdout),
+ {:ok, secondary} <- State.pipe(state, second),
+ {:error, :eagain} <- do_read_any(caller, arg, primary, secondary),
+ {:ok, new_state} <- State.put_operation(state, operation) do
+ # dbg(new_state)
+ {:noreply, new_state}
+ end
+ end
+
+ @spec read(State.t(), read_operation()) ::
+ :eof
+ | {:noreply, State.t()}
+ | {:ok, binary}
+ | {:error, term}
+ def read(state, operation) do
+ with {:ok, {_name, {caller, _}, arg}} <- validate_read_operation(operation),
+ {:ok, pipe} <- State.pipe(state, pipe_name(operation)),
+ {:error, :eagain} <- Pipe.read(pipe, arg, caller),
+ {:ok, new_state} <- State.put_operation(state, operation) do
+ {:noreply, new_state}
+ end
+ end
+
+ @spec write(State.t(), write_operation()) ::
+ :ok
+ | {:noreply, State.t()}
+ | {:error, :epipe}
+ | {:error, term}
+ def write(state, operation) do
+ with {:ok, {_name, {caller, _}, bin}} <- validate_write_operation(operation),
+ pipe_name <- pipe_name(operation),
+ {:ok, pipe} <- State.pipe(state, pipe_name) do
+ case Pipe.write(pipe, bin, caller) do
+ {:ok, size} ->
+ handle_successful_write(state, size, operation)
+
+ {:error, :eagain} ->
+ case State.put_operation(state, operation) do
+ {:ok, new_state} ->
+ {:noreply, new_state}
+
+ error ->
+ error
+ end
+
+ ret ->
+ ret
+ end
+ end
+ end
+
+ @spec match_pending_operation(State.t(), Pipe.name()) ::
+ {:ok, name} | {:error, :no_pending_operation}
+ # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
+ def match_pending_operation(state, pipe_name) do
+ cond do
+ state.operations.read_stdout_or_stderr &&
+ pipe_name in [:stdout, :stderr] ->
+ {:ok, :read_stdout_or_stderr}
+
+ state.operations.read_stdout &&
+ pipe_name == :stdout ->
+ {:ok, :read_stdout}
+
+ state.operations.read_stderr &&
+ pipe_name == :stderr ->
+ {:ok, :read_stderr}
+
+ state.operations.write_stdin &&
+ pipe_name == :stdin ->
+ {:ok, :write_stdin}
+
+ true ->
+ {:error, :no_pending_operation}
+ end
+ end
+
+ @spec handle_successful_write(State.t(), non_neg_integer(), write_operation()) ::
+ :ok | {:noreply, State.t()} | {:error, term}
+ defp handle_successful_write(state, written_size, {name, from, bin}) do
+ bin_size = byte_size(bin)
+
+ # check if it is partial write
+ if written_size < bin_size do
+ new_bin = binary_part(bin, written_size, bin_size - written_size)
+
+ case State.put_operation(state, {name, from, new_bin}) do
+ {:ok, new_state} ->
+ {:noreply, new_state}
+
+ error ->
+ error
+ end
+ else
+ :ok
+ end
+ end
+
+ @spec do_read_any(pid, non_neg_integer(), Pipe.t(), Pipe.t()) ::
+ :eof | {:ok, {Pipe.name(), binary}} | {:error, term}
+ defp do_read_any(caller, size, primary, secondary) do
+ case Pipe.read(primary, size, caller) do
+ ret1 when ret1 in [:eof, {:error, :eagain}, {:error, :pipe_closed_or_invalid_caller}] ->
+ case {ret1, Pipe.read(secondary, size, caller)} do
+ {:eof, :eof} ->
+ :eof
+
+ {_, {:ok, bin}} ->
+ {:ok, {secondary.name, bin}}
+
+ {ret1, {:error, :pipe_closed_or_invalid_caller}} ->
+ ret1
+
+ {_, ret2} ->
+ ret2
+ end
+
+ {:ok, bin} ->
+ {:ok, {primary.name, bin}}
+
+ ret1 ->
+ ret1
+ end
+ end
+
+ @spec validate_read_any_operation(operation) ::
+ {:ok, read_any_operation()} | {:error, :invalid_operation}
+ defp validate_read_any_operation(operation) do
+ case operation do
+ {:read_stdout_or_stderr, _from, size} when is_integer(size) and size >= 0 ->
+ {:ok, operation}
+
+ _ ->
+ {:error, :invalid_operation}
+ end
+ end
+
+ @spec validate_read_operation(operation) ::
+ {:ok, read_operation()} | {:error, :invalid_operation}
+ defp validate_read_operation(operation) do
+ case operation do
+ {:read_stdout, _from, size} when is_integer(size) and size >= 0 ->
+ {:ok, operation}
+
+ {:read_stderr, _from, size} when is_integer(size) and size >= 0 ->
+ {:ok, operation}
+
+ _ ->
+ {:error, :invalid_operation}
+ end
+ end
+
+ @spec validate_write_operation(operation) ::
+ {:ok, write_operation()} | {:error, :invalid_operation}
+ defp validate_write_operation(operation) do
+ case operation do
+ {:write_stdin, _from, bin} when is_binary(bin) ->
+ {:ok, operation}
+
+ _ ->
+ {:error, :invalid_operation}
+ end
+ end
+
+ @spec validate_operation(operation) :: {:ok, operation()} | {:error, :invalid_operation}
+ defp validate_operation(operation) do
+ with {:error, :invalid_operation} <- validate_read_operation(operation),
+ {:error, :invalid_operation} <- validate_read_any_operation(operation) do
+ validate_write_operation(operation)
+ end
+ end
+
+ @spec pipe_name(operation()) :: :stdin | :stdout | :stderr
+ defp pipe_name({op, _from, _}) do
+ case op do
+ :write_stdin -> :stdin
+ :read_stdout -> :stdout
+ :read_stderr -> :stderr
+ :read_stdout_or_stderr -> :stdout
+ end
+ end
+end
diff --git a/lib/exile/process/pipe.ex b/lib/exile/process/pipe.ex
new file mode 100644
index 0000000..c4d0a5a
--- /dev/null
+++ b/lib/exile/process/pipe.ex
@@ -0,0 +1,91 @@
+defmodule Exile.Process.Pipe do
+ @moduledoc false
+
+ alias Exile.Process.Nif
+
+ @type name :: Exile.Process.pipe_name()
+
+ @type fd :: non_neg_integer()
+
+ @type t :: %__MODULE__{
+ name: name,
+ fd: pos_integer() | nil,
+ monitor_ref: reference() | nil,
+ owner: pid | nil,
+ status: :open | :closed
+ }
+
+ defstruct [:name, :fd, :monitor_ref, :owner, status: :init]
+
+ alias __MODULE__
+
+ @spec new(name, pos_integer, pid) :: t
+ def new(name, fd, owner) do
+ if name in [:stdin, :stdout, :stderr] do
+ ref = Process.monitor(owner)
+ %Pipe{name: name, fd: fd, status: :open, owner: owner, monitor_ref: ref}
+ else
+ raise "invalid pipe name"
+ end
+ end
+
+ @spec new(name) :: t
+ def new(name) do
+ if name in [:stdin, :stdout, :stderr] do
+ %Pipe{name: name, status: :closed}
+ else
+ raise "invalid pipe name"
+ end
+ end
+
+ @spec open?(t) :: boolean()
+ def open?(pipe), do: pipe.status == :open
+
+ @spec read(t, non_neg_integer, pid) :: :eof | {:ok, binary} | {:error, :eagain} | {:error, term}
+ def read(pipe, size, caller) do
+ if caller != pipe.owner do
+ {:error, :pipe_closed_or_invalid_caller}
+ else
+ case Nif.nif_read(pipe.fd, size) do
+ # normalize return value
+ {:ok, <<>>} -> :eof
+ ret -> ret
+ end
+ end
+ end
+
+ @spec write(t, binary, pid) :: {:ok, size :: non_neg_integer()} | {:error, term}
+ def write(pipe, bin, caller) do
+ if caller != pipe.owner do
+ {:error, :pipe_closed_or_invalid_caller}
+ else
+ Nif.nif_write(pipe.fd, bin)
+ end
+ end
+
+ @spec close(t, pid) :: {:ok, t} | {:error, :pipe_closed_or_invalid_caller}
+ def close(pipe, caller) do
+ if caller != pipe.owner do
+ {:error, :pipe_closed_or_invalid_caller}
+ else
+ Process.demonitor(pipe.monitor_ref, [:flush])
+ Nif.nif_close(pipe.fd)
+ pipe = %Pipe{pipe | status: :closed, monitor_ref: nil, owner: nil}
+
+ {:ok, pipe}
+ end
+ end
+
+ @spec set_owner(t, pid) :: {:ok, t} | {:error, :closed}
+ def set_owner(pipe, new_owner) do
+ if pipe.status == :open do
+ ref = Process.monitor(new_owner)
+ Process.demonitor(pipe.monitor_ref, [:flush])
+ pipe = %Pipe{pipe | owner: new_owner, monitor_ref: ref}
+
+ {:ok, pipe}
+ else
+ {:error, :closed}
+ end
+ end
+end
diff --git a/lib/exile/process/state.ex b/lib/exile/process/state.ex
new file mode 100644
index 0000000..8775f8e
--- /dev/null
+++ b/lib/exile/process/state.ex
@@ -0,0 +1,99 @@
+defmodule Exile.Process.State do
+ @moduledoc false
+
+ alias Exile.Process.Exec
+ alias Exile.Process.Operations
+ alias Exile.Process.Pipe
+
+ alias __MODULE__
+
+ @type read_mode :: :stdout | :stderr | :stdout_or_stderr
+
+ @type pipes :: %{
+ stdin: Pipe.t(),
+ stdout: Pipe.t(),
+ stderr: Pipe.t()
+ }
+
+ @type status ::
+ :init
+ | :running
+ | {:exit, non_neg_integer()}
+ | {:exit, {:error, error :: term}}
+
+ @type t :: %__MODULE__{
+ args: Exec.args(),
+ owner: pid,
+ port: port(),
+ pipes: pipes,
+ status: status,
+ enable_stderr: boolean(),
+ operations: Operations.t(),
+ exit_ref: reference(),
+ monitor_ref: reference()
+ }
+
+ defstruct [
+ :args,
+ :owner,
+ :port,
+ :pipes,
+ :status,
+ :enable_stderr,
+ :operations,
+ :exit_ref,
+ :monitor_ref
+ ]
+
+ alias __MODULE__
+
+ @spec pipe(State.t(), name :: Pipe.name()) :: {:ok, Pipe.t()} | {:error, :invalid_name}
+ def pipe(%State{} = state, name) do
+ if name in [:stdin, :stdout, :stderr] do
+ {:ok, Map.fetch!(state.pipes, name)}
+ else
+ {:error, :invalid_name}
+ end
+ end
+
+ @spec put_pipe(State.t(), name :: Pipe.name(), Pipe.t()) :: {:ok, t} | {:error, :invalid_name}
+ def put_pipe(%State{} = state, name, pipe) do
+ if name in [:stdin, :stdout, :stderr] do
+ pipes = Map.put(state.pipes, name, pipe)
+ state = %State{state | pipes: pipes}
+ {:ok, state}
+ else
+ {:error, :invalid_name}
+ end
+ end
+
+ @spec pipe_name_for_fd(State.t(), fd :: Pipe.fd()) :: Pipe.name()
+ def pipe_name_for_fd(state, fd) do
+ pipe =
+ state.pipes
+ |> Map.values()
+ |> Enum.find(&(&1.fd == fd))
+
+ pipe.name
+ end
+
+ @spec put_operation(State.t(), Operations.operation()) :: {:ok, t} | {:error, term}
+ def put_operation(%State{operations: ops} = state, operation) do
+ with {:ok, ops} <- Operations.put(ops, operation) do
+ {:ok, %State{state | operations: ops}}
+ end
+ end
+
+ @spec pop_operation(State.t(), Operations.name()) ::
+ {:ok, Operations.operation(), t} | {:error, term}
+ def pop_operation(%State{operations: ops} = state, name) do
+ with {:ok, operation, ops} <- Operations.pop(ops, name) do
+ {:ok, operation, %State{state | operations: ops}}
+ end
+ end
+
+ @spec set_status(State.t(), status) :: State.t()
+ def set_status(state, status) do
+ %State{state | status: status}
+ end
+end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index 0e672a6..57ef065 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,206 +1,258 @@
defmodule Exile.Stream do
@moduledoc """
Defines a `Exile.Stream` struct returned by `Exile.stream!/2`.
"""
alias Exile.Process
alias Exile.Process.Error
+ require Logger
+
defmodule Sink do
@moduledoc false
- defstruct [:process]
+ @type t :: %__MODULE__{process: Process.t(), ignore_epipe: boolean}
+
+ defstruct [:process, :ignore_epipe]
defimpl Collectable do
- def into(%{process: process} = stream) do
+ def into(%{process: process}) do
collector_fun = fn
:ok, {:cont, x} ->
- :ok = Process.write(process, x)
+ case Process.write(process, x) do
+ {:error, :epipe} ->
+ # there is no other way to stop a Collectable than to
+ # raise error, we catch this error and return `{:error, :epipe}`
+ raise Error, "epipe"
- :ok, :done ->
- :ok = Process.close_stdin(process)
- stream
+ :ok ->
+ :ok
+ end
- :ok, :halt ->
- :ok = Process.close_stdin(process)
+ acc, :done ->
+ acc
+
+ acc, :halt ->
+ acc
end
{:ok, collector_fun}
end
end
end
- defstruct [:process, :stream_opts]
+ defstruct [:process, :stream_opts, :writer_task]
- @type t :: %__MODULE__{}
+ @typedoc "Struct members are private, do not depend on them"
+ @type t :: %__MODULE__{process: Process.t(), stream_opts: map, writer_task: Task.t()}
@doc false
@spec __build__(nonempty_list(String.t()), keyword()) :: t()
def __build__(cmd_with_args, opts) do
{stream_opts, process_opts} =
- Keyword.split(opts, [:exit_timeout, :max_chunk_size, :input, :use_stderr])
+ Keyword.split(opts, [:exit_timeout, :max_chunk_size, :input, :enable_stderr, :ignore_epipe])
case normalize_stream_opts(stream_opts) do
{:ok, stream_opts} ->
- process_opts = Keyword.put(process_opts, :use_stderr, stream_opts[:use_stderr])
+ process_opts = Keyword.put(process_opts, :enable_stderr, stream_opts[:enable_stderr])
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
- start_input_streamer(%Sink{process: process}, stream_opts.input)
- %Exile.Stream{process: process, stream_opts: stream_opts}
+
+ writer_task =
+ start_input_streamer(
+ %Sink{process: process, ignore_epipe: stream_opts[:ignore_epipe]},
+ stream_opts.input
+ )
+
+ %Exile.Stream{process: process, stream_opts: stream_opts, writer_task: writer_task}
{:error, error} ->
raise ArgumentError, message: error
end
end
@doc false
- defp start_input_streamer(sink, input) do
+ @spec start_input_streamer(term, term) :: Task.t()
+ defp start_input_streamer(%Sink{process: process} = sink, input) do
case input do
:no_input ->
- :ok
+ # use `Task.completed(:ok)` when bumping min Elixir requirement
+ Task.async(fn -> :ok end)
{:enumerable, enum} ->
- spawn_link(fn ->
- Enum.into(enum, sink)
+ Task.async(fn ->
+ Process.change_pipe_owner(process, :stdin, self())
+
+ try do
+ Enum.into(enum, sink)
+ rescue
+ Error ->
+ {:error, :epipe}
+ end
end)
{:collectable, func} ->
- spawn_link(fn ->
- func.(sink)
+ Task.async(fn ->
+ Process.change_pipe_owner(process, :stdin, self())
+
+ try do
+ func.(sink)
+ rescue
+ Error ->
+ {:error, :epipe}
+ end
end)
end
end
defimpl Enumerable do
def reduce(arg, acc, fun) do
- %{process: process, stream_opts: %{use_stderr: use_stderr} = stream_opts} = arg
+ %{
+ process: process,
+ stream_opts:
+ %{
+ enable_stderr: enable_stderr,
+ ignore_epipe: ignore_epipe
+ } = stream_opts,
+ writer_task: writer_task
+ } = arg
start_fun = fn -> :normal end
next_fun = fn :normal ->
case Process.read_any(process, stream_opts.max_chunk_size) do
:eof ->
{:halt, :normal}
- {:ok, {:stdout, x}} when use_stderr == false ->
+ {:ok, {:stdout, x}} when enable_stderr == false ->
{[IO.iodata_to_binary(x)], :normal}
- {:ok, {stream, x}} when use_stderr == true ->
+ {:ok, {stream, x}} when enable_stderr == true ->
{[{stream, IO.iodata_to_binary(x)}], :normal}
{:error, errno} ->
raise Error, "Failed to read from the external process. errno: #{errno}"
end
end
after_fun = fn exit_type ->
- try do
- # always close stdin before stopping to give the command chance to exit properly
- Process.close_stdin(process)
- result = Process.await_exit(process, stream_opts.exit_timeout)
+ result = Process.await_exit(process, stream_opts.exit_timeout)
+ writer_task_status = Task.await(writer_task)
- case {exit_type, result} do
- {_, :timeout} ->
- Process.kill(process, :sigkill)
- raise Error, "command fail to exit within timeout: #{stream_opts[:exit_timeout]}"
+ case {exit_type, result, writer_task_status} do
+ {:normal, {:ok, 0}, :ok} ->
+ :ok
- {:normal, {:ok, {:exit, 0}}} ->
- :ok
+ {:normal, {:ok, _error}, {:error, :epipe}} when ignore_epipe ->
+ :ok
- {:normal, {:ok, error}} ->
- raise Error, "command exited with status: #{inspect(error)}"
+ {:normal, {:ok, _error}, {:error, :epipe}} ->
+ raise Error, "abnormal command exit, received EPIPE while writing to stdin"
- {exit_type, error} ->
- Process.kill(process, :sigkill)
- raise Error, "command exited with exit_type: #{exit_type}, error: #{inspect(error)}"
- end
- after
- Process.stop(process)
+ {:normal, {:ok, error}, _} ->
+ raise Error, "command exited with status: #{inspect(error)}"
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
end
+ @spec normalize_input(term) ::
+ {:ok, :no_input} | {:ok, {:enumerable, term}} | {:ok, {:collectable, function}}
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}
!is_function(term) && Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}
is_function(term, 1) ->
{:ok, {:collectable, term}}
true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end
defp normalize_max_chunk_size(max_chunk_size) do
case max_chunk_size do
nil ->
{:ok, 65_536}
max_chunk_size when is_integer(max_chunk_size) and max_chunk_size > 0 ->
{:ok, max_chunk_size}
_ ->
{:error, ":max_chunk_size must be a positive integer"}
end
end
defp normalize_exit_timeout(timeout) do
case timeout do
nil ->
- {:ok, :infinity}
+ {:ok, 5000}
timeout when is_integer(timeout) and timeout > 0 ->
{:ok, timeout}
_ ->
{:error, ":exit_timeout must be either :infinity or an integer"}
end
end
- defp normalize_use_stderr(use_stderr) do
- case use_stderr do
+ defp normalize_enable_stderr(enable_stderr) do
+ case enable_stderr do
+ nil ->
+ {:ok, false}
+
+ enable_stderr when is_boolean(enable_stderr) ->
+ {:ok, enable_stderr}
+
+ _ ->
+ {:error, ":enable_stderr must be a boolean"}
+ end
+ end
+
+ defp normalize_ignore_epipe(ignore_epipe) do
+ case ignore_epipe do
nil ->
{:ok, false}
- use_stderr when is_boolean(use_stderr) ->
- {:ok, use_stderr}
+ ignore_epipe when is_boolean(ignore_epipe) ->
+ {:ok, ignore_epipe}
_ ->
- {:error, ":use_stderr must be a boolean"}
+ {:error, ":ignore_epipe must be a boolean"}
end
end
defp normalize_stream_opts(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]),
{:ok, max_chunk_size} <- normalize_max_chunk_size(opts[:max_chunk_size]),
- {:ok, use_stderr} <- normalize_use_stderr(opts[:use_stderr]) do
+ {:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
+ {:ok, ignore_epipe} <- normalize_ignore_epipe(opts[:ignore_epipe]) do
{:ok,
%{
input: input,
exit_timeout: exit_timeout,
max_chunk_size: max_chunk_size,
- use_stderr: use_stderr
+ enable_stderr: enable_stderr,
+ ignore_epipe: ignore_epipe
}}
end
end
end
diff --git a/lib/exile/watcher.ex b/lib/exile/watcher.ex
index 12ce45e..f659cb1 100644
--- a/lib/exile/watcher.ex
+++ b/lib/exile/watcher.ex
@@ -1,78 +1,82 @@
defmodule Exile.Watcher do
@moduledoc false
use GenServer, restart: :temporary
+
require Logger
- alias Exile.ProcessNif, as: Nif
+ alias Exile.Process.Nif, as: Nif
def start_link(args) do
{:ok, _pid} = GenServer.start_link(__MODULE__, args)
end
def watch(pid, os_pid, socket_path) do
spec = {Exile.Watcher, %{pid: pid, os_pid: os_pid, socket_path: socket_path}}
DynamicSupervisor.start_child(Exile.WatcherSupervisor, spec)
end
+ @impl true
def init(args) do
%{pid: pid, os_pid: os_pid, socket_path: socket_path} = args
Process.flag(:trap_exit, true)
ref = Elixir.Process.monitor(pid)
{:ok, %{pid: pid, os_pid: os_pid, socket_path: socket_path, ref: ref}}
end
+ @impl true
def handle_info({:DOWN, ref, :process, pid, _reason}, %{
pid: pid,
socket_path: socket_path,
os_pid: os_pid,
ref: ref
}) do
- File.rm!(socket_path)
- attempt_graceful_exit(os_pid)
+ _ = File.rm(socket_path)
+ # at max we wait for 50ms for program to exit
+ if process_exit?(os_pid, 50) do
+ Logger.debug("External program exited automatically")
+ else
+ attempt_graceful_exit(os_pid)
+ end
+
{:stop, :normal, nil}
end
def handle_info({:EXIT, _, reason}, nil), do: {:stop, reason, nil}
# when watcher is attempted to be killed, we forcefully kill external os process.
# This can happen when beam receive SIGTERM
def handle_info({:EXIT, _, reason}, %{pid: pid, socket_path: socket_path, os_pid: os_pid}) do
Logger.debug(fn -> "Watcher exiting. reason: #{inspect(reason)}" end)
File.rm!(socket_path)
- Exile.Process.stop(pid)
+ Elixir.Process.exit(pid, :watcher_exit)
attempt_graceful_exit(os_pid)
{:stop, reason, nil}
end
defp attempt_graceful_exit(os_pid) do
- Logger.debug(fn -> "Stopping external program" end)
-
- # at max we wait for 100ms for program to exit
- process_exit?(os_pid, 100) && throw(:done)
-
Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
Nif.nif_kill(os_pid, :sigterm)
process_exit?(os_pid, 100) && throw(:done)
Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
Nif.nif_kill(os_pid, :sigkill)
process_exit?(os_pid, 200) && throw(:done)
Logger.error("failed to kill external process")
raise "Failed to kill external process"
catch
:done ->
Logger.debug(fn -> "External program exited successfully" end)
end
defp process_exit?(os_pid), do: !Nif.nif_is_os_pid_alive(os_pid)
defp process_exit?(os_pid, timeout) do
if process_exit?(os_pid) do
true
else
:timer.sleep(timeout)
process_exit?(os_pid)
end
end
end
diff --git a/mix.lock b/mix.lock
index 521ee50..c709796 100644
--- a/mix.lock
+++ b/mix.lock
@@ -1,24 +1,24 @@
%{
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
"credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"},
"dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"},
"earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"},
"elixir_make": {:hex, :elixir_make, "0.7.6", "67716309dc5d43e16b5abbd00c01b8df6a0c2ab54a8f595468035a50189f9169", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5a0569756b0f7873a77687800c164cca6dfc03a09418e6fcf853d78991f49940"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
- "ex_doc": {:hex, :ex_doc, "0.29.3", "f07444bcafb302db86e4f02d8bbcd82f2e881a0dcf4f3e4740e4b8128b9353f7", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3dc6787d7b08801ec3b51e9bd26be5e8826fbf1a17e92d1ebc252e1a1c75bfe1"},
+ "ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
"excoveralls": {:hex, :excoveralls, "0.16.1", "0bd42ed05c7d2f4d180331a20113ec537be509da31fed5c8f7047ce59ee5a7c5", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dae763468e2008cf7075a64cb1249c97cb4bc71e236c5c2b5e5cdf1cfa2bf138"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~> 2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
- "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
+ "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
- "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
+ "nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
}
diff --git a/test/exile/process_nif_test.exs b/test/exile/process_nif_test.exs
deleted file mode 100644
index 19c2584..0000000
--- a/test/exile/process_nif_test.exs
+++ /dev/null
@@ -1,3 +0,0 @@
-defmodule Exile.ProcessNifTest do
- use ExUnit.Case, async: false
-end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index c34880a..41bc27d 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,410 +1,634 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
+
alias Exile.Process
+ alias Exile.Process.{Pipe, State}
- test "read" do
- {:ok, s} = Process.start_link(~w(echo test))
- assert {:ok, iodata} = Process.read(s, 100)
- assert :eof = Process.read(s, 100)
- assert IO.iodata_to_binary(iodata) == "test\n"
- assert :ok == Process.close_stdin(s)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
- Process.stop(s)
- end
+ describe "pipes" do
+ test "reading from stdout" do
+ {:ok, s} = Process.start_link(~w(echo test))
+ :timer.sleep(100)
- test "write" do
- {:ok, s} = Process.start_link(~w(cat))
- assert :ok == Process.write(s, "hello")
- assert {:ok, iodata} = Process.read(s, 5)
- assert IO.iodata_to_binary(iodata) == "hello"
-
- assert :ok == Process.write(s, "world")
- assert {:ok, iodata} = Process.read(s, 5)
- assert IO.iodata_to_binary(iodata) == "world"
-
- assert :ok == Process.close_stdin(s)
- assert :eof == Process.read(s)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
- Process.stop(s)
- end
+ assert {:ok, iodata} = Process.read(s, 100)
+ assert :eof = Process.read(s, 100)
+ assert IO.iodata_to_binary(iodata) == "test\n"
- test "stdin close" do
- logger = start_events_collector()
+ assert :ok == Process.close_stdin(s)
+ assert :ok == Process.close_stdout(s)
- # base64 produces output only after getting EOF from stdin. we
- # collect events in order and assert that we can still read from
- # stdout even after closing stdin
- {:ok, s} = Process.start_link(~w(base64))
+ assert {:ok, 0} == Process.await_exit(s, 500)
- # parallel reader should be blocked till we close stdin
- start_parallel_reader(s, logger)
- :timer.sleep(100)
+ refute Elixir.Process.alive?(s.pid)
+ end
- assert :ok == Process.write(s, "hello")
- add_event(logger, {:write, "hello"})
- assert :ok == Process.write(s, "world")
- add_event(logger, {:write, "world"})
- :timer.sleep(100)
+ test "write to stdin" do
+ {:ok, s} = Process.start_link(~w(cat))
- assert :ok == Process.close_stdin(s)
- add_event(logger, :input_close)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
- Process.stop(s)
+ assert :ok == Process.write(s, "hello")
+ assert {:ok, iodata} = Process.read(s, 5)
+ assert IO.iodata_to_binary(iodata) == "hello"
- assert [
- {:write, "hello"},
- {:write, "world"},
- :input_close,
- {:read, "aGVsbG93b3JsZA==\n"},
- :eof
- ] == get_events(logger)
- end
+ assert :ok == Process.write(s, "world")
+ assert {:ok, iodata} = Process.read(s, 5)
+ assert IO.iodata_to_binary(iodata) == "world"
- test "external command termination on stop" do
- {:ok, s} = Process.start_link(~w(cat))
- {:ok, os_pid} = Process.os_pid(s)
- assert os_process_alive?(os_pid)
+ assert :ok == Process.close_stdin(s)
+ assert :eof == Process.read(s)
- Process.stop(s)
- :timer.sleep(100)
+ assert {:ok, 0} == Process.await_exit(s, 100)
- refute os_process_alive?(os_pid)
- end
+ :timer.sleep(100)
+ refute Elixir.Process.alive?(s.pid)
+ end
- test "external command kill on stop" do
- {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ test "when stdin is closed" do
+ logger = start_events_collector()
+
+ # base64 produces output only after getting EOF from stdin. we
+ # collect events in order and assert that we can still read from
+ # stdout even after closing stdin
+ {:ok, s} = Process.start_link(~w(base64))
+
+ # parallel reader should be blocked till we close stdin
+ start_parallel_reader(s, logger)
+ :timer.sleep(100)
+
+ assert :ok == Process.write(s, "hello")
+ add_event(logger, {:write, "hello"})
+ assert :ok == Process.write(s, "world")
+ add_event(logger, {:write, "world"})
+ :timer.sleep(100)
+
+ assert :ok == Process.close_stdin(s)
+ add_event(logger, :input_close)
+ assert {:ok, 0} == Process.await_exit(s)
+ # Process.stop(s)
+
+ # wait for the reader to read
+ Elixir.Process.sleep(500)
+
+ assert [
+ {:write, "hello"},
+ {:write, "world"},
+ :input_close,
+ {:read, "aGVsbG93b3JsZA==\n"},
+ :eof
+ ] == get_events(logger)
+ end
- {:ok, os_pid} = Process.os_pid(s)
- assert os_process_alive?(os_pid)
- Process.stop(s)
+ test "reading from stderr" do
+ {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: true)
+ assert {:ok, "foo\n"} = Process.read_stderr(s, 100)
+ end
+
+ test "reading from stdout or stderr using read_any" do
+ script = """
+ echo "foo"
+ echo "bar" >&2
+ """
+
+ {:ok, s} = Process.start_link(["sh", "-c", script], enable_stderr: true)
+
+ {:ok, ret1} = Process.read_any(s, 100)
+ {:ok, ret2} = Process.read_any(s, 100)
+
+ assert {:stderr, "bar\n"} in [ret1, ret2]
+ assert {:stdout, "foo\n"} in [ret1, ret2]
+
+ assert :eof = Process.read_any(s, 100)
+ end
- if os_process_alive?(os_pid) do
+ test "reading from stderr_read when stderr disabled" do
+ {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: false)
+ assert {:error, :pipe_closed_or_invalid_caller} = Process.read_stderr(s, 100)
+ end
+
+ test "read_any with stderr disabled" do
+ script = """
+ echo "foo"
+ echo "bar" >&2
+ """
+
+ {:ok, s} = Process.start_link(["sh", "-c", script], enable_stderr: false)
+ {:ok, ret} = Process.read_any(s, 100)
+
+ # we can still read from stdout even if stderr is disabled
+ assert ret == {:stdout, "foo\n"}
+ assert :eof = Process.read_any(s, 100)
+ end
+
+ test "if pipe gets closed on pipe owner exit normally" do
+ {:ok, s} = Process.start_link(~w(sleep 10000))
+
+ writer =
+ Task.async(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+ end)
+
+ # stdin must be closed on task completion
+ :ok = Task.await(writer)
+
+ assert %State{
+ pipes: %{
+ stdin: %Pipe{
+ name: :stdin,
+ fd: _,
+ monitor_ref: nil,
+ owner: nil,
+ status: :closed
+ },
+ # ensure other pipes are unaffected
+ stdout: %Pipe{
+ name: :stdout,
+ status: :open
+ }
+ }
+ } = :sys.get_state(s.pid)
+ end
+
+ test "if pipe gets closed on pipe owner is killed" do
+ {:ok, s} = Process.start_link(~w(sleep 10000))
+
+ writer =
+ spawn(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+
+ receive do
+ :block -> :ok
+ end
+ end)
+
+ # wait for pipe owner to change
+ :timer.sleep(100)
+
+ # stdin must be closed on process kill
+ true = Elixir.Process.exit(writer, :kill)
:timer.sleep(1000)
- refute os_process_alive?(os_pid)
- else
- :ok
+
+ assert %State{
+ pipes: %{
+ stdin: %Pipe{
+ name: :stdin,
+ fd: _,
+ monitor_ref: nil,
+ owner: nil,
+ status: :closed
+ },
+ # ensure other pipes are unaffected
+ stdout: %Pipe{
+ name: :stdout,
+ status: :open
+ }
+ }
+ } = :sys.get_state(s.pid)
end
end
- test "exit status" do
- {:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
- assert {:ok, {:exit, 10}} == Process.await_exit(s, 500)
- Process.stop(s)
- end
+ describe "process termination" do
+ test "if external program terminates on process exit" do
+ {:ok, s} = Process.start_link(~w(cat))
+ {:ok, os_pid} = Process.os_pid(s)
- test "writing binary larger than pipe buffer size" do
- large_bin = generate_binary(5 * 65_535)
- {:ok, s} = Process.start_link(~w(cat))
+ assert os_process_alive?(os_pid)
- writer =
- Task.async(fn ->
- Process.write(s, large_bin)
- Process.close_stdin(s)
- end)
+ :ok = Process.close_stdin(s)
+ :timer.sleep(100)
+
+ refute os_process_alive?(os_pid)
+ end
+
+ test "watcher kills external command on process without exit_await" do
+ {os_pid, s} =
+ Task.async(fn ->
+ {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ {:ok, os_pid} = Process.os_pid(s)
+ assert os_process_alive?(os_pid)
+
+ # ensure the script set the correct signal handlers (handlers to ignore signal)
+ assert {:ok, "ignored signals\n"} = Process.read(s)
+
+ # exit without waiting for the exile process
+ {os_pid, s}
+ end)
+ |> Task.await()
+
+ :timer.sleep(500)
+
+ # Exile Process should exit after Task process terminates
+ refute Elixir.Process.alive?(s.pid)
+ refute os_process_alive?(os_pid)
+ end
+
+ test "await_exit with timeout" do
+ {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ {:ok, os_pid} = Process.os_pid(s)
+ assert os_process_alive?(os_pid)
+
+ assert {:ok, "ignored signals\n"} = Process.read(s)
+
+ # attempt to kill the process after 100ms
+ assert {:ok, 137} = Process.await_exit(s, 100)
+
+ refute os_process_alive?(os_pid)
+ refute Elixir.Process.alive?(s.pid)
+ end
+
+ test "exit status" do
+ {:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
+ assert {:ok, 10} == Process.await_exit(s)
+ end
+
+ test "writing binary larger than pipe buffer size" do
+ large_bin = generate_binary(5 * 65_535)
+ {:ok, s} = Process.start_link(~w(cat))
+
+ writer =
+ Task.async(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+ Process.write(s, large_bin)
+ end)
- :timer.sleep(100)
+ :timer.sleep(100)
- iodata =
- Stream.unfold(nil, fn _ ->
- case Process.read(s) do
- {:ok, data} -> {data, nil}
- :eof -> nil
- end
+ iodata =
+ Stream.unfold(nil, fn _ ->
+ case Process.read(s) do
+ {:ok, data} -> {data, nil}
+ :eof -> nil
+ end
+ end)
+ |> Enum.to_list()
+
+ Task.await(writer)
+
+ assert IO.iodata_length(iodata) == 5 * 65_535
+ assert {:ok, 0} == Process.await_exit(s, 500)
+ end
+
+ test "if exile process is terminated on owner exit even if pipe owner is alive" do
+ parent = self()
+
+ owner =
+ spawn(fn ->
+ # owner process terminated without await_exit
+ {:ok, s} = Process.start_link(~w(cat))
+
+ snd(parent, {:ok, s})
+ :exit = recv(parent)
+ end)
+
+ {:ok, s} = recv(owner)
+
+ spawn_link(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+ block()
end)
- |> Enum.to_list()
- Task.await(writer)
+ spawn_link(fn ->
+ Process.change_pipe_owner(s, :stdout, self())
+ block()
+ end)
- assert IO.iodata_length(iodata) == 5 * 65_535
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
- Process.stop(s)
- end
+ # wait for pipe owner to change
+ :timer.sleep(500)
- test "stderr_read" do
- {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], use_stderr: true)
- assert {:ok, "foo\n"} = Process.read_stderr(s, 100)
- Process.stop(s)
- end
+ snd(owner, :exit)
- test "stderr_read with stderr disabled" do
- {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], use_stderr: false)
- assert {:error, :cannot_read_stderr} = Process.read_stderr(s, 100)
- Process.stop(s)
- end
+ # wait for messages to propagate, if there are any
+ :timer.sleep(500)
- test "read_any" do
- script = """
- echo "foo"
- echo "bar" >&2
- """
+ refute Elixir.Process.alive?(owner)
+ refute Elixir.Process.alive?(s.pid)
+ end
- {:ok, s} = Process.start_link(["sh", "-c", script], use_stderr: true)
- {:ok, ret1} = Process.read_any(s, 100)
- {:ok, ret2} = Process.read_any(s, 100)
+ test "if exile process is *NOT* terminated on owner exit, if any pipe owner is alive" do
+ parent = self()
- assert {:stderr, "bar\n"} in [ret1, ret2]
- assert {:stdout, "foo\n"} in [ret1, ret2]
+ {:ok, s} = Process.start_link(~w(cat))
- assert :eof = Process.read_any(s, 100)
- Process.stop(s)
- end
+ io_proc =
+ spawn_link(fn ->
+ :ok = Process.change_pipe_owner(s, :stdin, self())
+ :ok = Process.change_pipe_owner(s, :stdout, self())
+ recv(parent)
+ end)
+
+ # wait for pipe owner to change
+ :timer.sleep(100)
+
+ # external process will be killed with SIGTERM (143)
+ assert {:ok, 143} = Process.await_exit(s, 100)
+
+ # wait for messages to propagate, if there are any
+ :timer.sleep(100)
+
+ assert Elixir.Process.alive?(s.pid)
+
+ assert %State{
+ pipes: %{
+ stdin: %Pipe{status: :open},
+ stdout: %Pipe{status: :open}
+ }
+ } = :sys.get_state(s.pid)
+
+ # when the io_proc exits, the pipes should be closed and process must terminate
+ snd(io_proc, :exit)
+ :timer.sleep(100)
+
+ refute Elixir.Process.alive?(s.pid)
+ end
+
+ test "when process is killed with a pending concurrent write" do
+ {:ok, s} = Process.start_link(~w(cat))
+ {:ok, os_pid} = Process.os_pid(s)
+
+ large_data =
+ Stream.cycle(["test"])
+ |> Stream.take(500_000)
+ |> Enum.to_list()
+ |> IO.iodata_to_binary()
+
+ task =
+ Task.async(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+ Process.write(s, large_data)
+ end)
+
+ # to avoid race conditions, like if process is killed before owner
+ # is changed
+ :timer.sleep(200)
+
+ assert {:ok, 1} == Process.await_exit(s)
+
+ refute os_process_alive?(os_pid)
+ assert {:error, :epipe} == Task.await(task)
+ end
+
+ test "if owner is killed when the exile process is killed" do
+ parent = self()
+
+ # create an exile process without linking to caller
+ owner =
+ spawn(fn ->
+ assert {:ok, s} = Process.start_link(~w(cat))
+ snd(parent, s.pid)
+ block()
+ end)
+
+ owner_ref = Elixir.Process.monitor(owner)
+
+ exile_pid = recv(owner)
- test "read_any with stderr disabled" do
- script = """
- echo "foo"
- echo "bar" >&2
- """
+ exile_ref = Elixir.Process.monitor(exile_pid)
- {:ok, s} = Process.start_link(["sh", "-c", script], use_stderr: false)
- {:ok, ret1} = Process.read_any(s, 100)
+ assert Elixir.Process.alive?(owner)
+ assert Elixir.Process.alive?(exile_pid)
- assert ret1 == {:stdout, "foo\n"}
+ true = Elixir.Process.exit(exile_pid, :kill)
+
+ assert_receive {:DOWN, ^owner_ref, :process, ^owner, :killed}
+ assert_receive {:DOWN, ^exile_ref, :process, ^exile_pid, :killed}
+ end
+
+ test "if exile process is killed when the owner is killed" do
+ parent = self()
+
+ # create an exile process without linking to caller
+ owner =
+ spawn(fn ->
+ assert {:ok, s} = Process.start_link(~w(cat))
+ snd(parent, s.pid)
+ block()
+ end)
- assert :eof = Process.read_any(s, 100)
- Process.stop(s)
+ owner_ref = Elixir.Process.monitor(owner)
+
+ exile_pid = recv(owner)
+
+ exile_ref = Elixir.Process.monitor(exile_pid)
+
+ assert Elixir.Process.alive?(owner)
+ assert Elixir.Process.alive?(exile_pid)
+
+ true = Elixir.Process.exit(owner, :kill)
+
+ assert_receive {:DOWN, ^owner_ref, :process, ^owner, :killed}
+ assert_receive {:DOWN, ^exile_ref, :process, ^exile_pid, :killed}
+ end
end
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link(~w(cat))
large_bin = generate_binary(65_535 * 5)
writer =
Task.async(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
:ok = Process.write(s, large_bin)
add_event(logger, {:write, IO.iodata_length(large_bin)})
- Process.close_stdin(s)
end)
:timer.sleep(50)
reader =
Task.async(fn ->
+ Process.change_pipe_owner(s, :stdout, self())
+
Stream.unfold(nil, fn _ ->
case Process.read(s) do
{:ok, data} ->
add_event(logger, {:read, IO.iodata_length(data)})
# delay in reading should delay writes
:timer.sleep(10)
{nil, nil}
:eof ->
nil
end
end)
|> Stream.run()
end)
Task.await(writer)
Task.await(reader)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
- Process.stop(s)
+ assert {:ok, 0} == Process.await_exit(s)
events = get_events(logger)
{write_events, read_evants} = Enum.split_with(events, &match?({:write, _}, &1))
assert Enum.sum(Enum.map(read_evants, fn {:read, size} -> size end)) ==
Enum.sum(Enum.map(write_events, fn {:write, size} -> size end))
# There must be a read before write completes
assert hd(events) == {:read, 65_535}
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
- Process.stop(s)
-
open_files = parse_lsof(bin)
- assert [%{fd: "0", name: _, type: "PIPE"}, %{type: "PIPE", fd: "1", name: _}] = open_files
- end
-
- test "process kill with pending write" do
- {:ok, s} = Process.start_link(~w(cat))
- {:ok, os_pid} = Process.os_pid(s)
-
- large_data =
- Stream.cycle(["test"]) |> Stream.take(500_000) |> Enum.to_list() |> IO.iodata_to_binary()
- task =
- Task.async(fn ->
- try do
- Process.write(s, large_data)
- catch
- :exit, reason -> reason
- end
- end)
-
- :timer.sleep(200)
- Process.stop(s)
- :timer.sleep(3000)
-
- refute os_process_alive?(os_pid)
- assert {:normal, _} = Task.await(task)
+ assert [
+ %{type: "PIPE", fd: "0", name: _},
+ %{type: "PIPE", fd: "1", name: _},
+ %{type: "CHR", fd: "2", name: "/dev/ttys007"}
+ ] = open_files
end
- test "concurrent read" do
- {:ok, s} = Process.start_link(~w(cat))
-
- task = Task.async(fn -> Process.read(s, 1) end)
+ describe "options and validation" do
+ test "cd option" do
+ parent = Path.expand("..", File.cwd!())
+ {:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
+ {:ok, dir} = Process.read(s)
- # delaying concurrent read to avoid race-condition
- Elixir.Process.sleep(100)
- assert {:error, :pending_stdout_read} = Process.read(s, 1)
-
- assert :ok == Process.close_stdin(s)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
- Process.stop(s)
- _ = Task.await(task)
- end
-
- test "cd" do
- parent = Path.expand("..", File.cwd!())
- {:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
- {:ok, dir} = Process.read(s)
- assert String.trim(dir) == parent
- assert {:ok, {:exit, 0}} = Process.await_exit(s)
- Process.stop(s)
- end
+ assert String.trim(dir) == parent
+ assert {:ok, 0} = Process.await_exit(s)
+ end
- test "invalid path" do
- assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
- end
+ test "when cd is invalid" do
+ assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
+ end
- test "invalid opt" do
- assert {:error, "invalid opts: [invalid: :test]"} =
- Process.start_link(~w(cat), invalid: :test)
- end
+ test "when user pass invalid option" do
+ assert {:error, "invalid opts: [invalid: :test]"} =
+ Process.start_link(~w(cat), invalid: :test)
+ end
- test "env" do
- assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
+ test "env option" do
+ assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
- assert {:ok, "test\n"} = Process.read(s)
- assert {:ok, {:exit, 0}} = Process.await_exit(s)
- Process.stop(s)
- end
+ assert {:ok, "test\n"} = Process.read(s)
+ assert {:ok, 0} = Process.await_exit(s)
+ end
- test "if external process inherits beam env" do
- :ok = System.put_env([{"BEAM_ENV_A", "10"}])
- assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
+ test "if external process inherits beam env" do
+ :ok = System.put_env([{"BEAM_ENV_A", "10"}])
+ assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
- assert {:ok, "10\n"} = Process.read(s)
- assert {:ok, {:exit, 0}} = Process.await_exit(s)
- Process.stop(s)
- end
+ assert {:ok, "10\n"} = Process.read(s)
+ assert {:ok, 0} = Process.await_exit(s)
+ end
- test "if user env overrides beam env" do
- :ok = System.put_env([{"BEAM_ENV", "base"}])
+ test "if user env overrides beam env" do
+ :ok = System.put_env([{"BEAM_ENV", "base"}])
- assert {:ok, s} =
- Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
+ assert {:ok, s} =
+ Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
- assert {:ok, "overridden\n"} = Process.read(s)
- assert {:ok, {:exit, 0}} = Process.await_exit(s)
- Process.stop(s)
+ assert {:ok, "overridden\n"} = Process.read(s)
+ assert {:ok, 0} = Process.await_exit(s)
+ end
end
- test "await_exit when process is stopped" do
- assert {:ok, s} = Process.start_link(~w(cat))
-
- tasks =
- Enum.map(1..10, fn _ ->
- Task.async(fn -> Process.await_exit(s) end)
- end)
-
- assert :ok == Process.close_stdin(s)
-
- Elixir.Process.sleep(100)
-
- Enum.each(tasks, fn task ->
- assert {:ok, {:exit, 0}} = Task.await(task)
+ def start_parallel_reader(process, logger) do
+ spawn_link(fn ->
+ :ok = Process.change_pipe_owner(process, :stdout, self())
+ reader_loop(process, logger)
end)
-
- Process.stop(s)
end
- def start_parallel_reader(proc_server, logger) do
- spawn_link(fn -> reader_loop(proc_server, logger) end)
- end
-
- def reader_loop(proc_server, logger) do
- case Process.read(proc_server) do
+ def reader_loop(process, logger) do
+ case Process.read(process) do
{:ok, data} ->
add_event(logger, {:read, data})
- reader_loop(proc_server, logger)
+ reader_loop(process, logger)
:eof ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
- Stream.repeatedly(fn -> "A" end) |> Enum.take(size) |> IO.iodata_to_binary()
+ Stream.repeatedly(fn -> "A" end)
+ |> Enum.take(size)
+ |> IO.iodata_to_binary()
+ end
+
+ defp block do
+ rand = :rand.uniform()
+
+ receive do
+ ^rand -> :ok
+ end
+ end
+
+ defp snd(pid, term) do
+ send(pid, {self(), term})
+ end
+
+ defp recv(sender) do
+ receive do
+ {^sender, term} -> term
+ after
+ 1000 ->
+ raise "recv timeout"
+ end
end
end
diff --git a/test/exile/sync_process_test.exs b/test/exile/sync_process_test.exs
index cb701fb..164a69c 100644
--- a/test/exile/sync_process_test.exs
+++ b/test/exile/sync_process_test.exs
@@ -1,54 +1,54 @@
defmodule Exile.SyncProcessTest do
use ExUnit.Case, async: false
alias Exile.Process
@bin Stream.repeatedly(fn -> "A" end) |> Enum.take(65_535) |> IO.iodata_to_binary()
test "memory leak" do
:timer.sleep(1000)
before_exec = :erlang.memory(:total)
{:ok, s} = Process.start_link(~w(cat))
Enum.each(1..500, fn _ ->
:ok = Process.write(s, @bin)
{:ok, _} = Process.read(s, 65_535)
end)
:timer.sleep(1000)
after_exec = :erlang.memory(:total)
assert_in_delta before_exec, after_exec, 1024 * 1024
assert :ok == Process.close_stdin(s)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
- Process.stop(s)
+ assert {:ok, 0} == Process.await_exit(s, 500)
+ # Process.stop(s)
end
test "if watcher process exits on command exit" do
stop_all_children(Exile.WatcherSupervisor)
assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
assert {:ok, s} = Process.start_link(~w(cat))
# we spawn in background
:timer.sleep(200)
assert %{active: 1, workers: 1} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
Process.close_stdin(s)
- assert {:ok, {:exit, 0}} = Process.await_exit(s, 500)
- Process.stop(s)
+ assert {:ok, 0} = Process.await_exit(s, 500)
+ # Process.stop(s)
# wait for watcher to terminate
:timer.sleep(200)
assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
end
defp stop_all_children(sup) do
DynamicSupervisor.which_children(sup)
|> Enum.each(fn {_, pid, _, _} ->
DynamicSupervisor.terminate_child(sup, pid)
end)
end
end
diff --git a/test/exile/watcher_test.exs b/test/exile/watcher_test.exs
index fc55ba0..82c8cb5 100644
--- a/test/exile/watcher_test.exs
+++ b/test/exile/watcher_test.exs
@@ -1,55 +1,47 @@
defmodule Exile.WatcherTest do
use ExUnit.Case, async: true
alias Exile.Process
- test "uds path socket cleanup after successful exit" do
+ test "when exile process exit normally" do
{:ok, s} = Process.start_link(~w(cat))
- %{socket_path: socket_path} = :sys.get_state(s)
-
- assert File.exists?(socket_path)
-
- :ok = Process.close_stdin(s)
- Elixir.Process.sleep(100)
-
- {:ok, {:exit, 0}} = Process.await_exit(s)
- :ok = Process.stop(s)
-
- Elixir.Process.sleep(100)
-
- refute File.exists?(socket_path)
- end
-
- test "external process and uds path cleanup on error" do
- {:ok, s} = Process.start_link(~w(cat))
- %{socket_path: socket_path} = :sys.get_state(s)
{:ok, os_pid} = Process.os_pid(s)
- assert File.exists?(socket_path)
assert os_process_alive?(os_pid)
- Elixir.Process.exit(s, :kill)
- Elixir.Process.sleep(100)
+ {:ok, 0} = Process.await_exit(s)
- refute File.exists?(socket_path)
refute os_process_alive?(os_pid)
end
- test "if external process is killed with SIGTERM" do
- {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ test "if external process is cleaned up when Exile Process is killed" do
+ parent = self()
- {:ok, os_pid} = Process.os_pid(s)
- assert os_process_alive?(os_pid)
- Process.stop(s)
+ # Exile process is linked to caller so we must run test this in
+ # separate process which is not linked
+ spawn(fn ->
+ {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ {:ok, os_pid} = Process.os_pid(s)
+ assert os_process_alive?(os_pid)
+ send(parent, os_pid)
+
+ Elixir.Process.exit(s.pid, :kill)
+ end)
+
+ os_pid =
+ receive do
+ os_pid -> os_pid
+ end
:timer.sleep(1000)
+
refute os_process_alive?(os_pid)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
end
diff --git a/test/exile_test.exs b/test/exile_test.exs
index a245e5f..fe7822a 100644
--- a/test/exile_test.exs
+++ b/test/exile_test.exs
@@ -1,71 +1,90 @@
defmodule ExileTest do
use ExUnit.Case
test "stream with enumerable" do
proc_stream =
- Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end), use_stderr: false)
+ Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end), enable_stderr: false)
- stdout = Enum.to_list(proc_stream)
+ stdout = proc_stream |> Enum.to_list()
assert IO.iodata_length(stdout) == 1000
end
test "stream with collectable" do
proc_stream =
Exile.stream!(["cat"], input: fn sink -> Enum.into(1..1000, sink, fn _ -> "a" end) end)
stdout = Enum.to_list(proc_stream)
assert IO.iodata_length(stdout) == 1000
end
test "stream without stdin" do
proc_stream = Exile.stream!(~w(echo hello))
stdout = Enum.to_list(proc_stream)
assert IO.iodata_to_binary(stdout) == "hello\n"
end
test "stderr" do
- proc_stream = Exile.stream!(["sh", "-c", "echo foo >>/dev/stderr"], use_stderr: true)
+ proc_stream = Exile.stream!(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: true)
assert {[], stderr} = split_stream(proc_stream)
assert IO.iodata_to_binary(stderr) == "foo\n"
end
test "multiple streams" do
script = """
for i in {1..1000}; do
echo "foo ${i}"
echo "bar ${i}" >&2
done
"""
- proc_stream = Exile.stream!(["sh", "-c", script], use_stderr: true)
+ proc_stream = Exile.stream!(["sh", "-c", script], enable_stderr: true)
{stdout, stderr} = split_stream(proc_stream)
stdout_lines = String.split(Enum.join(stdout), "\n", trim: true)
stderr_lines = String.split(Enum.join(stderr), "\n", trim: true)
assert length(stdout_lines) == length(stderr_lines)
assert Enum.all?(stdout_lines, &String.starts_with?(&1, "foo "))
assert Enum.all?(stderr_lines, &String.starts_with?(&1, "bar "))
end
test "environment variable" do
output =
Exile.stream!(~w(printenv FOO), env: %{"FOO" => "bar"})
|> Enum.to_list()
|> IO.iodata_to_binary()
assert output == "bar\n"
end
+ test "premature stream termination" do
+ input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
+
+ assert_raise Exile.Process.Error,
+ "abnormal command exit, received EPIPE while writing to stdin",
+ fn ->
+ Exile.stream!(~w(cat), input: input_stream)
+ |> Enum.take(1)
+ end
+ end
+
+ test "premature stream termination when ignore_epipe is true" do
+ input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
+
+ assert ["hello"] ==
+ Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true, max_chunk_size: 5)
+ |> Enum.take(1)
+ end
+
defp split_stream(stream) do
{stdout, stderr} =
Enum.reduce(stream, {[], []}, fn
{:stdout, data}, {stdout, stderr} -> {[data | stdout], stderr}
{:stderr, data}, {stdout, stderr} -> {stdout, [data | stderr]}
end)
{Enum.reverse(stdout), Enum.reverse(stderr)}
end
end
diff --git a/test/scripts/ignore_sigterm.sh b/test/scripts/ignore_sigterm.sh
index 67d05d3..e5fef5e 100755
--- a/test/scripts/ignore_sigterm.sh
+++ b/test/scripts/ignore_sigterm.sh
@@ -1,7 +1,10 @@
#!/bin/bash
trap -- '' SIGINT SIGTERM SIGTSTP
+
+echo "ignored signals"
+
while true; do
date +%F_%T
sleep 1
done

File Metadata

Mime Type
text/x-diff
Expires
Wed, Nov 27, 8:33 AM (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40586
Default Alt Text
(130 KB)

Event Timeline