Page MenuHomePhorge

No OneTemporary

Size
29 KB
Referenced Files
None
Subscribers
None
diff --git a/c_src/exile.c b/c_src/exile.c
index ebebaab..364c009 100644
--- a/c_src/exile.c
+++ b/c_src/exile.c
@@ -1,664 +1,666 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "erl_nif.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#ifdef ERTS_DIRTY_SCHEDULERS
#define USE_DIRTY_IO ERL_NIF_DIRTY_JOB_IO_BOUND
#else
#define USE_DIRTY_IO 0
#endif
//#define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
#define start_timing() ErlNifTime __start = enif_monotonic_time(ERL_NIF_USEC)
#define elapsed_microseconds() (enif_monotonic_time(ERL_NIF_USEC) - __start)
#else
#define debug(...)
#define start_timing()
#define elapsed_microseconds() 0
#endif
#define error(...) \
do { \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
#define GET_CTX(env, arg, ctx) \
do { \
ExilePriv *data = enif_priv_data(env); \
if (enif_get_resource(env, arg, data->exec_ctx_rt, (void **)&ctx) == \
false) { \
return make_error(env, ATOM_INVALID_CTX); \
} \
} while (0);
static const int PIPE_READ = 0;
static const int PIPE_WRITE = 1;
static const int PIPE_CLOSED = -1;
static const int CMD_EXIT = -1;
static const int MAX_ARGUMENTS = 20;
static const int MAX_ARGUMENT_LEN = 1024;
+static const int UNBUFFERED_READ = -1;
/* We are choosing an exit code which is not reserved see:
* https://www.tldp.org/LDP/abs/html/exitcodes.html. */
static const int FORK_EXEC_FAILURE = 125;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_UNDEFINED;
static ERL_NIF_TERM ATOM_INVALID_CTX;
static ERL_NIF_TERM ATOM_PIPE_CLOSED;
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ALLOC_FAILED;
/* command exit types */
static ERL_NIF_TERM ATOM_EXIT;
static ERL_NIF_TERM ATOM_SIGNALED;
static ERL_NIF_TERM ATOM_STOPPED;
enum exec_status {
SUCCESS,
PIPE_CREATE_ERROR,
PIPE_FLAG_ERROR,
FORK_ERROR,
PIPE_DUP_ERROR,
NULL_DEV_OPEN_ERROR,
};
enum exit_type { NORMAL_EXIT, SIGNALED, STOPPED };
typedef struct ExilePriv {
ErlNifResourceType *exec_ctx_rt;
ErlNifResourceType *io_rt;
} ExilePriv;
typedef struct ExecContext {
int cmd_input_fd;
int cmd_output_fd;
int exit_status; // can be exit status or signal number depending on exit_type
enum exit_type exit_type;
pid_t pid;
// these are to hold enif_select resource objects
int *read_resource;
int *write_resource;
} ExecContext;
typedef struct StartProcessResult {
bool success;
int err;
ExecContext context;
} StartProcessResult;
/* TODO: assert if the external process is exit (?) */
static void exec_ctx_dtor(ErlNifEnv *env, void *obj) {
ExecContext *ctx = obj;
enif_release_resource(ctx->read_resource);
enif_release_resource(ctx->write_resource);
debug("Exile exec_ctx_dtor called");
}
static void exec_ctx_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
debug("Exile exec_ctx_stop called");
}
static void exec_ctx_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
debug("Exile exec_ctx_down called");
}
static ErlNifResourceTypeInit exec_ctx_rt_init = {exec_ctx_dtor, exec_ctx_stop,
exec_ctx_down};
static void io_resource_dtor(ErlNifEnv *env, void *obj) {
debug("Exile io_resource_dtor called");
}
static void io_resource_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
debug("Exile io_resource_stop called %d", fd);
}
static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
debug("Exile io_resource_down called");
}
static ErlNifResourceTypeInit io_rt_init = {io_resource_dtor, io_resource_stop,
io_resource_down};
static inline ERL_NIF_TERM make_ok(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_OK, term);
}
static inline ERL_NIF_TERM make_error(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_ERROR, term);
}
static int set_flag(int fd, int flags) {
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
}
static void close_all(int pipes[2][2]) {
for (int i = 0; i < 2; i++) {
if (pipes[i][PIPE_READ] > 0)
close(pipes[i][PIPE_READ]);
if (pipes[i][PIPE_WRITE] > 0)
close(pipes[i][PIPE_WRITE]);
}
}
/* time is assumed to be in microseconds */
static void notify_consumed_timeslice(ErlNifEnv *env, ErlNifTime start,
ErlNifTime stop) {
ErlNifTime pct;
pct = (ErlNifTime)((stop - start) / 10);
if (pct > 100)
pct = 100;
else if (pct == 0)
pct = 1;
enif_consume_timeslice(env, pct);
}
/* This is not ideal, but as of now there is no portable way to do this */
static void close_all_fds() {
int fd_limit = (int)sysconf(_SC_OPEN_MAX);
for (int i = STDERR_FILENO + 1; i < fd_limit; i++)
close(i);
}
static StartProcessResult start_proccess(char *args[], bool stderr_to_console) {
StartProcessResult result = {.success = false};
pid_t pid;
int pipes[2][2] = {{0, 0}, {0, 0}};
if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1) {
result.err = errno;
perror("[exile] failed to create pipes");
close_all(pipes);
return result;
}
const int r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
const int w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
const int r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
const int w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0) {
result.err = errno;
perror("[exile] failed to set flags for pipes");
close_all(pipes);
return result;
}
switch (pid = fork()) {
case -1:
result.err = errno;
perror("[exile] failed to fork");
close_all(pipes);
return result;
case 0: // child
close(STDIN_FILENO);
close(STDOUT_FILENO);
if (dup2(r_cmdin, STDIN_FILENO) < 0) {
perror("[exile] failed to dup to stdin");
/* We are assuming FORK_EXEC_FAILURE exit code wont be used by the command
* we are running. Technically we can not assume any exit code here. The
* parent can not differentiate between exit before `exec` and the normal
* command exit.
* One correct way to solve this might be to have a separate
* pipe shared between child and parent and signaling the parent by
* closing it or writing to it. */
_exit(FORK_EXEC_FAILURE);
}
if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
perror("[exile] failed to dup to stdout");
_exit(FORK_EXEC_FAILURE);
}
if (stderr_to_console != true) {
close(STDERR_FILENO);
int dev_null = open("/dev/null", O_WRONLY);
if (dev_null == -1) {
perror("[exile] failed to open /dev/null");
_exit(FORK_EXEC_FAILURE);
}
if (dup2(dev_null, STDERR_FILENO) < 0) {
perror("[exile] failed to dup stderr");
_exit(FORK_EXEC_FAILURE);
}
close(dev_null);
}
close_all_fds();
execvp(args[0], args);
perror("[exile] execvp(): failed");
_exit(FORK_EXEC_FAILURE);
default: // parent
/* close file descriptors used by child */
close(r_cmdin);
close(w_cmdout);
result.success = true;
result.context.pid = pid;
result.context.cmd_input_fd = w_cmdin;
result.context.cmd_output_fd = r_cmdout;
return result;
}
}
/* TODO: return appropriate error instead returning generic "badarg" error */
static ERL_NIF_TERM execute(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
char tmp[MAX_ARGUMENTS][MAX_ARGUMENT_LEN + 1];
char *exec_args[MAX_ARGUMENTS + 1];
ErlNifTime start;
unsigned int args_len;
start = enif_monotonic_time(ERL_NIF_USEC);
if (enif_get_list_length(env, argv[0], &args_len) != true)
return enif_make_badarg(env);
if (args_len > MAX_ARGUMENTS)
return enif_make_badarg(env);
ERL_NIF_TERM head, tail, list = argv[0];
for (unsigned int i = 0; i < args_len; i++) {
if (enif_get_list_cell(env, list, &head, &tail) != true)
return enif_make_badarg(env);
if (enif_get_string(env, head, tmp[i], MAX_ARGUMENT_LEN, ERL_NIF_LATIN1) <
1)
return enif_make_badarg(env);
exec_args[i] = tmp[i];
list = tail;
}
exec_args[args_len] = NULL;
bool stderr_to_console = true;
int tmp_int;
if (enif_get_int(env, argv[1], &tmp_int) != true)
return enif_make_badarg(env);
stderr_to_console = tmp_int == 1 ? true : false;
struct ExilePriv *data = enif_priv_data(env);
StartProcessResult result = start_proccess(exec_args, stderr_to_console);
ExecContext *ctx = NULL;
ERL_NIF_TERM term;
if (result.success) {
ctx = enif_alloc_resource(data->exec_ctx_rt, sizeof(ExecContext));
ctx->cmd_input_fd = result.context.cmd_input_fd;
ctx->cmd_output_fd = result.context.cmd_output_fd;
ctx->read_resource = enif_alloc_resource(data->io_rt, sizeof(int));
ctx->write_resource = enif_alloc_resource(data->io_rt, sizeof(int));
ctx->pid = result.context.pid;
debug("pid: %d cmd_in_fd: %d cmd_out_fd: %d", ctx->pid, ctx->cmd_input_fd,
ctx->cmd_output_fd);
term = enif_make_resource(env, ctx);
/* resource should be collected beam GC when there are no more references */
enif_release_resource(ctx);
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
return make_ok(env, term);
} else {
return make_error(env, enif_make_int(env, result.err));
}
}
static int select_write(ErlNifEnv *env, ExecContext *ctx) {
int retval = enif_select(env, ctx->cmd_input_fd, ERL_NIF_SELECT_WRITE,
ctx->write_resource, NULL, ATOM_UNDEFINED);
if (retval != 0)
perror("select_write()");
return retval;
}
static ERL_NIF_TERM sys_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 2)
enif_make_badarg(env);
ErlNifTime start;
start = enif_monotonic_time(ERL_NIF_USEC);
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->cmd_input_fd == PIPE_CLOSED)
return make_error(env, ATOM_PIPE_CLOSED);
ErlNifBinary bin;
if (enif_inspect_binary(env, argv[1], &bin) != true)
return enif_make_badarg(env);
ssize_t result = write(ctx->cmd_input_fd, bin.data, bin.size);
int write_errono = errno;
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
/* TODO: branching is ugly, cleanup required */
if (result >= (ssize_t)bin.size) { // request completely satisfied
return make_ok(env, enif_make_int(env, result));
} else if (result >= 0) { // request partially satisfied
int retval = select_write(env, ctx);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, enif_make_int(env, result));
} else if (write_errono == EAGAIN) { // busy
int retval = select_write(env, ctx);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else { // Error
perror("write()");
return make_error(env, enif_make_int(env, write_errono));
}
}
static ERL_NIF_TERM sys_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
int kind;
enif_get_int(env, argv[1], &kind);
int result;
switch (kind) {
case 0:
if (ctx->cmd_input_fd == PIPE_CLOSED) {
return ATOM_OK;
} else {
enif_select(env, ctx->cmd_input_fd, ERL_NIF_SELECT_STOP,
ctx->write_resource, NULL, ATOM_UNDEFINED);
result = close(ctx->cmd_input_fd);
if (result == 0) {
ctx->cmd_input_fd = PIPE_CLOSED;
return ATOM_OK;
} else {
perror("cmd_input_fd close()");
return make_error(env, enif_make_int(env, errno));
}
}
case 1:
if (ctx->cmd_output_fd == PIPE_CLOSED) {
return ATOM_OK;
} else {
enif_select(env, ctx->cmd_output_fd, ERL_NIF_SELECT_STOP,
ctx->read_resource, NULL, ATOM_UNDEFINED);
result = close(ctx->cmd_output_fd);
if (result == 0) {
ctx->cmd_output_fd = PIPE_CLOSED;
return ATOM_OK;
} else {
perror("cmd_output_fd close()");
return make_error(env, enif_make_int(env, errno));
}
}
default:
debug("invalid file descriptor type");
return enif_make_badarg(env);
}
}
static int select_read(ErlNifEnv *env, ExecContext *ctx) {
int retval = enif_select(env, ctx->cmd_output_fd, ERL_NIF_SELECT_READ,
ctx->read_resource, NULL, ATOM_UNDEFINED);
if (retval != 0)
perror("select_read()");
return retval;
}
static ERL_NIF_TERM sys_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 2)
enif_make_badarg(env);
ErlNifTime start;
start = enif_monotonic_time(ERL_NIF_USEC);
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->cmd_output_fd == PIPE_CLOSED)
return make_error(env, ATOM_PIPE_CLOSED);
- bool is_buffered = true;
- int size;
- enif_get_int(env, argv[1], &size);
+ int size, request;
- if (size == -1) {
- size = 65535;
- is_buffered = false;
- } else if (size < 1) {
+ enif_get_int(env, argv[1], &request);
+ size = request;
+
+ if (request == UNBUFFERED_READ) {
+ size = 65535; // we try to read as much we can
+ } else if (request < 1) {
enif_make_badarg(env);
- } else if (size > 65535) {
+ } else if (request > 65535) {
size = 65535;
}
unsigned char buf[size];
ssize_t result = read(ctx->cmd_output_fd, buf, size);
int read_errno = errno;
ERL_NIF_TERM bin_term = 0;
if (result >= 0) {
/* no need to release this binary */
unsigned char *temp = enif_make_new_binary(env, result, &bin_term);
memcpy(temp, buf, result);
}
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
- /* TODO: branching is ugly, cleanup required */
- if (result >= size ||
- (is_buffered == false && result >= 0)) { // request completely satisfied
- return make_ok(env, bin_term);
- } else if (result > 0) { // request partially satisfied
- int retval = select_read(env, ctx);
- if (retval != 0)
- return make_error(env, enif_make_int(env, retval));
- return make_ok(env, bin_term);
- } else if (result == 0) { // EOF
- return make_ok(env, bin_term);
- } else if (read_errno == EAGAIN) { // busy
- int retval = select_read(env, ctx);
- if (retval != 0)
- return make_error(env, enif_make_int(env, retval));
- return make_error(env, ATOM_EAGAIN);
- } else { // Error
- perror("read()");
- return make_error(env, enif_make_int(env, read_errno));
+ if (result >= 0 ) {
+ // we do not 'select' if request completely satisfied OR EOF OR its UNBUFFERED_READ
+ if (result == request || result == 0 || request == UNBUFFERED_READ) {
+ return make_ok(env, bin_term);
+ } else { // request partially satisfied
+ int retval = select_read(env, ctx);
+ if (retval != 0)
+ return make_error(env, enif_make_int(env, retval));
+ return make_ok(env, bin_term);
+ }
+ } else {
+ if (read_errno == EAGAIN) { // busy
+ int retval = select_read(env, ctx);
+ if (retval != 0)
+ return make_error(env, enif_make_int(env, retval));
+ return make_error(env, ATOM_EAGAIN);
+ } else { // Error
+ perror("read()");
+ return make_error(env, enif_make_int(env, read_errno));
+ }
}
}
static ERL_NIF_TERM is_alive(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_ok(env, ATOM_TRUE);
int result = kill(ctx->pid, 0);
if (result == 0) {
return make_ok(env, ATOM_TRUE);
} else {
return make_ok(env, ATOM_FALSE);
}
}
static ERL_NIF_TERM sys_terminate(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_ok(env, enif_make_int(env, 0));
return make_ok(env, enif_make_int(env, kill(ctx->pid, SIGTERM)));
}
static ERL_NIF_TERM sys_kill(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_ok(env, enif_make_int(env, 0));
return make_ok(env, enif_make_int(env, kill(ctx->pid, SIGKILL)));
}
static ERL_NIF_TERM make_exit_term(ErlNifEnv *env, ExecContext *ctx) {
switch (ctx->exit_type) {
case NORMAL_EXIT:
return make_ok(env, enif_make_tuple2(env, ATOM_EXIT,
enif_make_int(env, ctx->exit_status)));
case SIGNALED:
/* exit_status here points to signal number */
return make_ok(env, enif_make_tuple2(env, ATOM_SIGNALED,
enif_make_int(env, ctx->exit_status)));
case STOPPED:
return make_ok(env, enif_make_tuple2(env, ATOM_STOPPED,
enif_make_int(env, ctx->exit_status)));
default:
error("Invalid wait status");
return make_error(env, ATOM_UNDEFINED);
}
}
static ERL_NIF_TERM sys_wait(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_exit_term(env, ctx);
int status;
int wpid = waitpid(ctx->pid, &status, WNOHANG);
if (wpid == ctx->pid) {
ctx->pid = CMD_EXIT;
if (WIFEXITED(status)) {
ctx->exit_type = NORMAL_EXIT;
ctx->exit_status = WEXITSTATUS(status);
} else if (WIFSIGNALED(status)) {
ctx->exit_type = SIGNALED;
ctx->exit_status = WTERMSIG(status);
} else if (WIFSTOPPED(status)) {
ctx->exit_type = STOPPED;
ctx->exit_status = 0;
}
return make_exit_term(env, ctx);
} else if (wpid != 0) {
perror("waitpid()");
}
ERL_NIF_TERM term = enif_make_tuple2(env, enif_make_int(env, wpid),
enif_make_int(env, status));
return make_error(env, term);
}
static ERL_NIF_TERM os_pid(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_ok(env, enif_make_int(env, 0));
return make_ok(env, enif_make_int(env, ctx->pid));
}
static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
struct ExilePriv *data = enif_alloc(sizeof(struct ExilePriv));
if (!data)
return 1;
data->exec_ctx_rt =
enif_open_resource_type_x(env, "exile_resource", &exec_ctx_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
data->io_rt =
enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_FALSE = enif_make_atom(env, "false");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
ATOM_INVALID_CTX = enif_make_atom(env, "invalid_exile_exec_ctx");
ATOM_PIPE_CLOSED = enif_make_atom(env, "closed_pipe");
ATOM_EXIT = enif_make_atom(env, "exit");
ATOM_SIGNALED = enif_make_atom(env, "signaled");
ATOM_STOPPED = enif_make_atom(env, "stopped");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ALLOC_FAILED = enif_make_atom(env, "alloc_failed");
*priv = (void *)data;
return 0;
}
static void on_unload(ErlNifEnv *env, void *priv) {
debug("exile unload");
enif_free(priv);
}
static ErlNifFunc nif_funcs[] = {
{"execute", 2, execute, USE_DIRTY_IO},
{"sys_write", 2, sys_write, USE_DIRTY_IO},
{"sys_read", 2, sys_read, USE_DIRTY_IO},
{"sys_close", 2, sys_close, USE_DIRTY_IO},
{"sys_terminate", 1, sys_terminate, USE_DIRTY_IO},
{"sys_wait", 1, sys_wait, USE_DIRTY_IO},
{"sys_kill", 1, sys_kill, USE_DIRTY_IO},
{"alive?", 1, is_alive, USE_DIRTY_IO},
{"os_pid", 1, os_pid, USE_DIRTY_IO},
};
ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index 92c42ff..756a451 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,265 +1,284 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
alias Exile.Process
test "read" do
{:ok, s} = Process.start_link("echo", ["test"])
assert {:eof, iodata} = Process.read(s, 100)
assert IO.iodata_to_binary(iodata) == "test\n"
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
end
test "write" do
{:ok, s} = Process.start_link("cat", [])
assert :ok == Process.write(s, "hello")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "hello"
assert :ok == Process.write(s, "world")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "world"
assert :ok == Process.close_stdin(s)
assert {:eof, []} == Process.read(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
end
test "stdin close" do
logger = start_events_collector()
# base64 produces output only after getting EOF from stdin. we
# collect events in order and assert that we can still read from
# stdout even after closing stdin
{:ok, s} = Process.start_link("base64", [])
# parallel reader should be blocked till we close stdin
start_parallel_reader(s, logger)
:timer.sleep(100)
assert :ok == Process.write(s, "hello")
add_event(logger, {:write, "hello"})
assert :ok == Process.write(s, "world")
add_event(logger, {:write, "world"})
:timer.sleep(100)
assert :ok == Process.close_stdin(s)
add_event(logger, :input_close)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 50)
assert [
{:write, "hello"},
{:write, "world"},
:input_close,
{:read, "aGVsbG93b3JsZA==\n"},
:eof
] == get_events(logger)
end
test "external command termination on stop" do
{:ok, s} = Process.start_link("cat", [])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
:timer.sleep(100)
refute os_process_alive?(os_pid)
end
test "external command kill on stop" do
# cat command hangs waiting for EOF
{:ok, s} = Process.start_link(fixture("ignore_sigterm.sh"), [])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
if os_process_alive?(os_pid) do
:timer.sleep(3000)
refute os_process_alive?(os_pid)
else
:ok
end
end
test "exit status" do
{:ok, s} = Process.start_link("sh", ~w(-c "exit 2"))
assert {:ok, {:exit, 2}} == Process.await_exit(s, 500)
end
+ test "writing binary larger than pipe buffer size" do
+ large_bin = generate_binary(5 * 65535)
+ {:ok, s} = Process.start_link("cat", [])
+
+ writer =
+ Task.async(fn ->
+ Process.write(s, large_bin)
+ Process.close_stdin(s)
+ end)
+
+ :timer.sleep(100)
+
+ {_, iodata} = Process.read(s, 5 * 65535)
+ Task.await(writer)
+
+ assert IO.iodata_length(iodata) == 5 * 65535
+ assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
+ end
+
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link("cat", [])
bin = generate_binary(65535)
# make buffer full
Process.write(s, bin)
writer =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.write(s, bin)
add_event(logger, {:write, i})
end)
Process.close_stdin(s)
end)
:timer.sleep(50)
reader =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.read(s, 65535)
add_event(logger, {:read, i})
# delay in reading should delay writes
:timer.sleep(10)
end)
end)
Task.await(writer)
Task.await(reader)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
assert [
write: 1,
read: 1,
write: 2,
read: 2,
write: 3,
read: 3,
write: 4,
read: 4,
write: 5,
read: 5,
write: 6,
read: 6,
write: 7,
read: 7,
write: 8,
read: 8,
write: 9,
read: 9,
write: 10,
read: 10
] == get_events(logger)
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link("sleep", ["60"])
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
Process.stop(s)
open_files = parse_lsof(bin)
assert [%{fd: "0", name: _, type: "PIPE"}, %{type: "PIPE", fd: "1", name: _}] = open_files
end
test "process kill with pending write" do
{:ok, s} = Process.start_link("cat", [])
{:ok, os_pid} = Process.os_pid(s)
large_data =
Stream.cycle(["test"]) |> Stream.take(500_000) |> Enum.to_list() |> IO.iodata_to_binary()
task =
Task.async(fn ->
try do
Process.write(s, large_data)
catch
:exit, reason -> reason
end
end)
:timer.sleep(200)
Process.stop(s)
:timer.sleep(3000)
refute os_process_alive?(os_pid)
assert {:normal, _} = Task.await(task)
end
def start_parallel_reader(proc_server, logger) do
spawn_link(fn -> reader_loop(proc_server, logger) end)
end
def reader_loop(proc_server, logger) do
case Process.read(proc_server) do
{:ok, data} ->
add_event(logger, {:read, data})
reader_loop(proc_server, logger)
{:eof, []} ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
Stream.repeatedly(fn -> "A" end) |> Enum.take(size) |> IO.iodata_to_binary()
end
end

File Metadata

Mime Type
text/x-diff
Expires
Fri, Nov 29, 10:45 AM (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
41199
Default Alt Text
(29 KB)

Event Timeline