Page MenuHomePhorge

No OneTemporary

Size
61 KB
Referenced Files
None
Subscribers
None
diff --git a/README.md b/README.md
index 49cc776..a10c933 100644
--- a/README.md
+++ b/README.md
@@ -1,93 +1,77 @@
# Exile
Exile is an alternative to beam [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix issues with ports.
-At high-level, exile is built around the idea of having demand-driven, asynchronous interaction with external command. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
+Exile is built around the idea of having demand-driven, asynchronous interaction with external command. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
``` elixir
-def audio_stream!(stream) do
- # read from stdin and write to stdout
- proc_stream = Exile.stream!("ffmpeg", ~w(-i - -f mp3 -))
-
- Task.async(fn ->
- Stream.into(stream, proc_stream)
- |> Stream.run()
- end)
-
- proc_stream
-end
-
-File.stream!("music_video.mkv", [], 65535)
-|> audio_stream!()
+Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
`Exile.stream!` is a convenience wrapper around `Exile.Process`. If you want more control over stdin, stdout, and os process use `Exile.Process` directly.
**Note: Exile is experimental and it is still work-in-progress. Exile is based on NIF, please know the implications of it before using it**
## Rationale
Approaches, and issues
#### Port
Port is the default way of executing external commands. This is okay when you have control over the external program's implementation and the interaction is minimal. Port has several important issues.
* it can end up creating [zombie process](https://hexdocs.pm/elixir/Port.html#module-zombie-operating-system-processes)
* cannot selectively close stdin. This is required when the external programs act on EOF from stdin
* it sends command output as a message to the beam process. This does not put back pressure on the external program and leads exhausting VM memory
#### Port based solutions
-There are many port based libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. These solve the first two issues associated with ports: zombie process and selectively closing STDIN. But not the third issue: having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses the port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
+There are many port based libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. These solve the first two issues associated with ports: zombie process and selectively closing STDIN. But not the third issue: having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
* no back-pressure
* additional os process (middleware) for every execution of your program
* in few cases such as porcelain user has to install this external program explicitly
* might not be suitable when the program requires constant communication between beam process and external program
#### [ExCmd](https://github.com/akash-akya/ex_cmd)
-This is my other stab at solving back pressure on the external program issue. ExCmd also uses a middleware for the operation. But unlike the above libraries, it uses named pipes (FIFO) for io instead of port. Back-pressure created by the underlying operating system.
+This is my other stab at solving back pressure on the external program issue. ExCmd also uses a middleware for the operation. But unlike the above libraries, it uses named pipes (FIFO) for io instead of port. Back-pressure created by the blocking system calls.
__Issue__
-ExCmd uses named FIFO for blocking input and output operation for building back-pressure. The blocking happens at the system call level. For example, reading the output from the program internally resolves to a blocking [`read()`](http://man7.org/linux/man-pages/man2/read.2.html) system call. This blocks the dirty io scheduler indefinitely. Since the scheduler can not preempt system call, the scheduler will be blocked until `read()` returns. Worst case scenario: there are as many blocking read/write as there are dirty io schedulers. This can lead to starvation of other io operations, low throughput, and dreaded scheduler collapse.
+ExCmd uses blocking system calls for building back-pressure. For example, reading the output from the program internally resolves to a blocking [`read()`](http://man7.org/linux/man-pages/man2/read.2.html) system call. This blocks the dirty io scheduler indefinitely. Since the scheduler can not preempt system call, the scheduler will be blocked until `read()` returns. Worst case scenario: there are as many blocking read/write as there are dirty io schedulers. This can lead to starvation of other io operations, low throughput, and dreaded scheduler collapse.
-As of now, there are no non-blocking io file operations available in the beam. Beam didn't allow opening FIFO files before OTP 21 for the same reason. One can fix this temporarily by starting beam with more dirty io schedulers.
+As of now, there are no non-blocking io file operations available to the user in beam. See ExCmd for possible workaround for this issue.
## Exile
-Exile takes a different NIF based approach. As of now the only way to do non-blocking, asynchronous io operations in the beam is to make the system calls ourselves with NIF (or port-driver). Exile uses a non-blocking system calls for io, so schedulers are never blocked indefinitely. It also uses POSIX `select()`. ie so polling is not done in userland.
-
-Building back-pressure using non-blocking io is done by blocking the program at os level (using pipes) and blocking beam process *inside VM* using good old message massing, unlike ExCmd which uses blocking system calls.
+Exile does non-blocking, asynchronous io system calls using NIF. Since it makes non-blocking system calls, schedulers are never blocked indefinitely. It does this in asynchronous fashion using `enif_select` so its efficient.
**Advantages over other approaches:**
-* solves all three issues of the port
+* solves all three issues associated with port
* it does not use any middleware
* no additional os process. no performance/resource cost
- * it is faster as there is no hop in between
* no need to install any external command
* can run many external programs in parallel without adversely affecting schedulers
* stream abstraction for interacting with the external program
* should be portable across POSIX compliant operating systems (not tested)
If you are running executing huge number of external programs **concurrently** (more than few hundred) you might have to increase open file descriptors limit (`ulimit -n`)
-Non-blocking io can be used for other interesting things. Such as reading named pipe (FIFO) files. `Exile.stream!("cat", ["data.pipe"])` does not block schedulers unlike default `file` based io.
+Non-blocking io can be used for other interesting things. Such as reading named pipe (FIFO) files. `Exile.stream!(~w(cat data.pipe))` does not block schedulers so you can open hundreds of fifo files unlike default `file` based io.
##### TODO
* add benchmarks results
### 🚨 Obligatory NIF warning
As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls, spawned external processes are still completely isolated at OS level and the port issues it tries to solve are critical.
### Usage
If all you want is to run a command with no communication, then just sticking with `System.cmd` is a better option.
For most of the use-cases using `Exile.stream!` abstraction should be enough. Use `Exile.Process` only if you need more control over the life-cycle of IO streams and OS process.
diff --git a/c_src/exile.c b/c_src/exile.c
index 781dacc..efd399d 100644
--- a/c_src/exile.c
+++ b/c_src/exile.c
@@ -1,672 +1,672 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "erl_nif.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#ifdef ERTS_DIRTY_SCHEDULERS
#define USE_DIRTY_IO ERL_NIF_DIRTY_JOB_IO_BOUND
#else
#define USE_DIRTY_IO 0
#endif
//#define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
#define start_timing() ErlNifTime __start = enif_monotonic_time(ERL_NIF_USEC)
#define elapsed_microseconds() (enif_monotonic_time(ERL_NIF_USEC) - __start)
#else
#define debug(...)
#define start_timing()
#define elapsed_microseconds() 0
#endif
#define error(...) \
do { \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
#define GET_CTX(env, arg, ctx) \
do { \
ExilePriv *data = enif_priv_data(env); \
if (enif_get_resource(env, arg, data->exec_ctx_rt, (void **)&ctx) == \
false) { \
return make_error(env, ATOM_INVALID_CTX); \
} \
} while (0);
static const int PIPE_READ = 0;
static const int PIPE_WRITE = 1;
static const int PIPE_CLOSED = -1;
static const int CMD_EXIT = -1;
static const int MAX_ARGUMENTS = 20;
static const int MAX_ARGUMENT_LEN = 1024;
static const int UNBUFFERED_READ = -1;
static const int PIPE_BUF_SIZE = 65535;
/* We are choosing an exit code which is not reserved see:
* https://www.tldp.org/LDP/abs/html/exitcodes.html. */
static const int FORK_EXEC_FAILURE = 125;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_UNDEFINED;
static ERL_NIF_TERM ATOM_INVALID_CTX;
static ERL_NIF_TERM ATOM_PIPE_CLOSED;
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ALLOC_FAILED;
/* command exit types */
static ERL_NIF_TERM ATOM_EXIT;
static ERL_NIF_TERM ATOM_SIGNALED;
static ERL_NIF_TERM ATOM_STOPPED;
enum exec_status {
SUCCESS,
PIPE_CREATE_ERROR,
PIPE_FLAG_ERROR,
FORK_ERROR,
PIPE_DUP_ERROR,
NULL_DEV_OPEN_ERROR,
};
enum exit_type { NORMAL_EXIT, SIGNALED, STOPPED };
typedef struct ExilePriv {
ErlNifResourceType *exec_ctx_rt;
ErlNifResourceType *io_rt;
} ExilePriv;
typedef struct ExecContext {
int cmd_input_fd;
int cmd_output_fd;
int exit_status; // can be exit status or signal number depending on exit_type
enum exit_type exit_type;
pid_t pid;
// these are to hold enif_select resource objects
int *read_resource;
int *write_resource;
} ExecContext;
typedef struct StartProcessResult {
bool success;
int err;
ExecContext context;
} StartProcessResult;
/* TODO: assert if the external process is exit (?) */
static void exec_ctx_dtor(ErlNifEnv *env, void *obj) {
ExecContext *ctx = obj;
enif_release_resource(ctx->read_resource);
enif_release_resource(ctx->write_resource);
debug("Exile exec_ctx_dtor called");
}
static void exec_ctx_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
debug("Exile exec_ctx_stop called");
}
static void exec_ctx_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
debug("Exile exec_ctx_down called");
}
static ErlNifResourceTypeInit exec_ctx_rt_init = {exec_ctx_dtor, exec_ctx_stop,
exec_ctx_down};
static void io_resource_dtor(ErlNifEnv *env, void *obj) {
debug("Exile io_resource_dtor called");
}
static void io_resource_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
debug("Exile io_resource_stop called %d", fd);
}
static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
debug("Exile io_resource_down called");
}
static ErlNifResourceTypeInit io_rt_init = {io_resource_dtor, io_resource_stop,
io_resource_down};
static inline ERL_NIF_TERM make_ok(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_OK, term);
}
static inline ERL_NIF_TERM make_error(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_ERROR, term);
}
static int set_flag(int fd, int flags) {
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
}
static void close_all(int pipes[2][2]) {
for (int i = 0; i < 2; i++) {
if (pipes[i][PIPE_READ] > 0)
close(pipes[i][PIPE_READ]);
if (pipes[i][PIPE_WRITE] > 0)
close(pipes[i][PIPE_WRITE]);
}
}
/* time is assumed to be in microseconds */
static void notify_consumed_timeslice(ErlNifEnv *env, ErlNifTime start,
ErlNifTime stop) {
ErlNifTime pct;
pct = (ErlNifTime)((stop - start) / 10);
if (pct > 100)
pct = 100;
else if (pct == 0)
pct = 1;
enif_consume_timeslice(env, pct);
}
/* This is not ideal, but as of now there is no portable way to do this */
static void close_all_fds() {
int fd_limit = (int)sysconf(_SC_OPEN_MAX);
for (int i = STDERR_FILENO + 1; i < fd_limit; i++)
close(i);
}
-static StartProcessResult start_proccess(char *args[], bool stderr_to_console) {
+static StartProcessResult start_process(char *args[], bool stderr_to_console) {
StartProcessResult result = {.success = false};
pid_t pid;
int pipes[2][2] = {{0, 0}, {0, 0}};
if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1) {
result.err = errno;
perror("[exile] failed to create pipes");
close_all(pipes);
return result;
}
const int r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
const int w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
const int r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
const int w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0) {
result.err = errno;
perror("[exile] failed to set flags for pipes");
close_all(pipes);
return result;
}
switch (pid = fork()) {
case -1:
result.err = errno;
perror("[exile] failed to fork");
close_all(pipes);
return result;
case 0: // child
close(STDIN_FILENO);
close(STDOUT_FILENO);
if (dup2(r_cmdin, STDIN_FILENO) < 0) {
perror("[exile] failed to dup to stdin");
/* We are assuming FORK_EXEC_FAILURE exit code wont be used by the command
* we are running. Technically we can not assume any exit code here. The
* parent can not differentiate between exit before `exec` and the normal
* command exit.
* One correct way to solve this might be to have a separate
* pipe shared between child and parent and signaling the parent by
* closing it or writing to it. */
_exit(FORK_EXEC_FAILURE);
}
if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
perror("[exile] failed to dup to stdout");
_exit(FORK_EXEC_FAILURE);
}
if (stderr_to_console != true) {
close(STDERR_FILENO);
int dev_null = open("/dev/null", O_WRONLY);
if (dev_null == -1) {
perror("[exile] failed to open /dev/null");
_exit(FORK_EXEC_FAILURE);
}
if (dup2(dev_null, STDERR_FILENO) < 0) {
perror("[exile] failed to dup stderr");
_exit(FORK_EXEC_FAILURE);
}
close(dev_null);
}
close_all_fds();
execvp(args[0], args);
perror("[exile] execvp(): failed");
_exit(FORK_EXEC_FAILURE);
default: // parent
/* close file descriptors used by child */
close(r_cmdin);
close(w_cmdout);
result.success = true;
result.context.pid = pid;
result.context.cmd_input_fd = w_cmdin;
result.context.cmd_output_fd = r_cmdout;
return result;
}
}
/* TODO: return appropriate error instead returning generic "badarg" error */
static ERL_NIF_TERM execute(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
char tmp[MAX_ARGUMENTS][MAX_ARGUMENT_LEN + 1];
char *exec_args[MAX_ARGUMENTS + 1];
ErlNifTime start;
unsigned int args_len;
start = enif_monotonic_time(ERL_NIF_USEC);
if (enif_get_list_length(env, argv[0], &args_len) != true)
return enif_make_badarg(env);
if (args_len > MAX_ARGUMENTS)
return enif_make_badarg(env);
ERL_NIF_TERM head, tail, list = argv[0];
for (unsigned int i = 0; i < args_len; i++) {
if (enif_get_list_cell(env, list, &head, &tail) != true)
return enif_make_badarg(env);
if (enif_get_string(env, head, tmp[i], MAX_ARGUMENT_LEN, ERL_NIF_LATIN1) <
1)
return enif_make_badarg(env);
exec_args[i] = tmp[i];
list = tail;
}
exec_args[args_len] = NULL;
bool stderr_to_console = true;
int tmp_int;
if (enif_get_int(env, argv[1], &tmp_int) != true)
return enif_make_badarg(env);
stderr_to_console = tmp_int == 1 ? true : false;
struct ExilePriv *data = enif_priv_data(env);
- StartProcessResult result = start_proccess(exec_args, stderr_to_console);
+ StartProcessResult result = start_process(exec_args, stderr_to_console);
ExecContext *ctx = NULL;
ERL_NIF_TERM term;
if (result.success) {
ctx = enif_alloc_resource(data->exec_ctx_rt, sizeof(ExecContext));
ctx->cmd_input_fd = result.context.cmd_input_fd;
ctx->cmd_output_fd = result.context.cmd_output_fd;
ctx->read_resource = enif_alloc_resource(data->io_rt, sizeof(int));
ctx->write_resource = enif_alloc_resource(data->io_rt, sizeof(int));
ctx->pid = result.context.pid;
debug("pid: %d cmd_in_fd: %d cmd_out_fd: %d", ctx->pid, ctx->cmd_input_fd,
ctx->cmd_output_fd);
term = enif_make_resource(env, ctx);
/* resource should be collected beam GC when there are no more references */
enif_release_resource(ctx);
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
return make_ok(env, term);
} else {
return make_error(env, enif_make_int(env, result.err));
}
}
static int select_write(ErlNifEnv *env, ExecContext *ctx) {
int retval = enif_select(env, ctx->cmd_input_fd, ERL_NIF_SELECT_WRITE,
ctx->write_resource, NULL, ATOM_UNDEFINED);
if (retval != 0)
perror("select_write()");
return retval;
}
static ERL_NIF_TERM sys_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 2)
enif_make_badarg(env);
ErlNifTime start;
start = enif_monotonic_time(ERL_NIF_USEC);
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->cmd_input_fd == PIPE_CLOSED)
return make_error(env, ATOM_PIPE_CLOSED);
ErlNifBinary bin;
if (enif_inspect_binary(env, argv[1], &bin) != true)
return enif_make_badarg(env);
if (bin.size == 0)
return enif_make_badarg(env);
/* should we limit the bin.size here? */
ssize_t result = write(ctx->cmd_input_fd, bin.data, bin.size);
int write_errono = errno;
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
/* TODO: branching is ugly, cleanup required */
if (result >= (ssize_t)bin.size) { // request completely satisfied
return make_ok(env, enif_make_int(env, result));
} else if (result >= 0) { // request partially satisfied
int retval = select_write(env, ctx);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, enif_make_int(env, result));
} else if (write_errono == EAGAIN) { // busy
int retval = select_write(env, ctx);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else { // Error
perror("write()");
return make_error(env, enif_make_int(env, write_errono));
}
}
static ERL_NIF_TERM sys_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
int kind;
enif_get_int(env, argv[1], &kind);
int result;
switch (kind) {
case 0:
if (ctx->cmd_input_fd == PIPE_CLOSED) {
return ATOM_OK;
} else {
enif_select(env, ctx->cmd_input_fd, ERL_NIF_SELECT_STOP,
ctx->write_resource, NULL, ATOM_UNDEFINED);
result = close(ctx->cmd_input_fd);
if (result == 0) {
ctx->cmd_input_fd = PIPE_CLOSED;
return ATOM_OK;
} else {
perror("cmd_input_fd close()");
return make_error(env, enif_make_int(env, errno));
}
}
case 1:
if (ctx->cmd_output_fd == PIPE_CLOSED) {
return ATOM_OK;
} else {
enif_select(env, ctx->cmd_output_fd, ERL_NIF_SELECT_STOP,
ctx->read_resource, NULL, ATOM_UNDEFINED);
result = close(ctx->cmd_output_fd);
if (result == 0) {
ctx->cmd_output_fd = PIPE_CLOSED;
return ATOM_OK;
} else {
perror("cmd_output_fd close()");
return make_error(env, enif_make_int(env, errno));
}
}
default:
debug("invalid file descriptor type");
return enif_make_badarg(env);
}
}
static int select_read(ErlNifEnv *env, ExecContext *ctx) {
int retval = enif_select(env, ctx->cmd_output_fd, ERL_NIF_SELECT_READ,
ctx->read_resource, NULL, ATOM_UNDEFINED);
if (retval != 0)
perror("select_read()");
return retval;
}
static ERL_NIF_TERM sys_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 2)
enif_make_badarg(env);
ErlNifTime start;
start = enif_monotonic_time(ERL_NIF_USEC);
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->cmd_output_fd == PIPE_CLOSED)
return make_error(env, ATOM_PIPE_CLOSED);
int size, request;
enif_get_int(env, argv[1], &request);
size = request;
if (request == UNBUFFERED_READ) {
size = PIPE_BUF_SIZE;
} else if (request < 1) {
enif_make_badarg(env);
} else if (request > PIPE_BUF_SIZE) {
size = PIPE_BUF_SIZE;
}
unsigned char buf[size];
ssize_t result = read(ctx->cmd_output_fd, buf, size);
int read_errno = errno;
ERL_NIF_TERM bin_term = 0;
if (result >= 0) {
/* no need to release this binary */
unsigned char *temp = enif_make_new_binary(env, result, &bin_term);
memcpy(temp, buf, result);
}
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (result >= 0) {
/* we do not 'select' if request completely satisfied OR EOF OR its
* UNBUFFERED_READ */
if (result == request || result == 0 || request == UNBUFFERED_READ) {
return make_ok(env, bin_term);
} else { // request partially satisfied
int retval = select_read(env, ctx);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, bin_term);
}
} else {
if (read_errno == EAGAIN) { // busy
int retval = select_read(env, ctx);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else { // Error
perror("read()");
return make_error(env, enif_make_int(env, read_errno));
}
}
}
static ERL_NIF_TERM is_alive(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_ok(env, ATOM_TRUE);
int result = kill(ctx->pid, 0);
if (result == 0) {
return make_ok(env, ATOM_TRUE);
} else {
return make_ok(env, ATOM_FALSE);
}
}
static ERL_NIF_TERM sys_terminate(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_ok(env, enif_make_int(env, 0));
return make_ok(env, enif_make_int(env, kill(ctx->pid, SIGTERM)));
}
static ERL_NIF_TERM sys_kill(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_ok(env, enif_make_int(env, 0));
return make_ok(env, enif_make_int(env, kill(ctx->pid, SIGKILL)));
}
static ERL_NIF_TERM make_exit_term(ErlNifEnv *env, ExecContext *ctx) {
switch (ctx->exit_type) {
case NORMAL_EXIT:
return make_ok(env, enif_make_tuple2(env, ATOM_EXIT,
enif_make_int(env, ctx->exit_status)));
case SIGNALED:
/* exit_status here points to signal number */
return make_ok(env, enif_make_tuple2(env, ATOM_SIGNALED,
enif_make_int(env, ctx->exit_status)));
case STOPPED:
return make_ok(env, enif_make_tuple2(env, ATOM_STOPPED,
enif_make_int(env, ctx->exit_status)));
default:
error("Invalid wait status");
return make_error(env, ATOM_UNDEFINED);
}
}
static ERL_NIF_TERM sys_wait(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_exit_term(env, ctx);
int status;
int wpid = waitpid(ctx->pid, &status, WNOHANG);
if (wpid == ctx->pid) {
ctx->pid = CMD_EXIT;
if (WIFEXITED(status)) {
ctx->exit_type = NORMAL_EXIT;
ctx->exit_status = WEXITSTATUS(status);
} else if (WIFSIGNALED(status)) {
ctx->exit_type = SIGNALED;
ctx->exit_status = WTERMSIG(status);
} else if (WIFSTOPPED(status)) {
ctx->exit_type = STOPPED;
ctx->exit_status = 0;
}
return make_exit_term(env, ctx);
} else if (wpid != 0) {
perror("waitpid()");
}
ERL_NIF_TERM term = enif_make_tuple2(env, enif_make_int(env, wpid),
enif_make_int(env, status));
return make_error(env, term);
}
static ERL_NIF_TERM os_pid(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ExecContext *ctx = NULL;
GET_CTX(env, argv[0], ctx);
if (ctx->pid == CMD_EXIT)
return make_ok(env, enif_make_int(env, 0));
return make_ok(env, enif_make_int(env, ctx->pid));
}
static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
struct ExilePriv *data = enif_alloc(sizeof(struct ExilePriv));
if (!data)
return 1;
data->exec_ctx_rt =
enif_open_resource_type_x(env, "exile_resource", &exec_ctx_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
data->io_rt =
enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_FALSE = enif_make_atom(env, "false");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
ATOM_INVALID_CTX = enif_make_atom(env, "invalid_exile_exec_ctx");
ATOM_PIPE_CLOSED = enif_make_atom(env, "closed_pipe");
ATOM_EXIT = enif_make_atom(env, "exit");
ATOM_SIGNALED = enif_make_atom(env, "signaled");
ATOM_STOPPED = enif_make_atom(env, "stopped");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ALLOC_FAILED = enif_make_atom(env, "alloc_failed");
*priv = (void *)data;
return 0;
}
static void on_unload(ErlNifEnv *env, void *priv) {
debug("exile unload");
enif_free(priv);
}
static ErlNifFunc nif_funcs[] = {
{"execute", 2, execute, USE_DIRTY_IO},
{"sys_write", 2, sys_write, USE_DIRTY_IO},
{"sys_read", 2, sys_read, USE_DIRTY_IO},
{"sys_close", 2, sys_close, USE_DIRTY_IO},
{"sys_terminate", 1, sys_terminate, USE_DIRTY_IO},
{"sys_wait", 1, sys_wait, USE_DIRTY_IO},
{"sys_kill", 1, sys_kill, USE_DIRTY_IO},
{"alive?", 1, is_alive, USE_DIRTY_IO},
{"os_pid", 1, os_pid, USE_DIRTY_IO},
};
ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
diff --git a/lib/exile.ex b/lib/exile.ex
index 93ca7a0..925e103 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,44 +1,43 @@
defmodule Exile do
@moduledoc """
Exile is an alternative for beam ports with back-pressure and non-blocking IO
"""
@doc """
- Returns a `Exile.Stream` for the given `cmd` with arguments `args`.
+ Runs the given command with arguments and return an Enumerable to read command output.
- The stream implements both `Enumerable` and `Collectable` protocols, which means it can be used both for reading from stdout and write to stdin of an OS process simultaneously (see examples).
-
- `Exile.Stream` works with [iodata](https://hexdocs.pm/elixir/IO.html#module-io-data), chunks emitted by Enumerable is iodata as well. If you want binary please use (`IO.iodata_to_binary/1`)[https://hexdocs.pm/elixir/IO.html#iodata_to_binary/1].
+ First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
+ * `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
+ 1. input as Enumerable:
+ ```elixir
+ # List
+ Exile.stream!(~w(bc -q), input: ["1+1\n", "2*2\n"]) |> Enum.to_list()
+
+ # Stream
+ Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
+ ```
+ 2. input as collectable:
+ If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
+ ```elixir
+ Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
+ |> Enum.to_list()
+ ```
+ By defaults no input will be given to the command
* `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
- * `chunk_size` - Size of each iodata chunk emitted by Enumerable stream. When set to `nil` the output is unbuffered. Use `nil` this if want to receive that data as soon as its flused by the external program. Defaults to 65535
+ * `chunk_size` - Size of each iodata chunk emitted by Enumerable stream. When set to `nil` the output is unbuffered and chunk size will be variable. Defaults to 65535
All other options are passed to `Exile.Process.start_link/3`
- Since execution of external program is independent of beam process, once should prefer feeding input and getting output in separate processes.
-
### Examples
``` elixir
- def audio_stream!(stream) do
- # read from stdin and write to stdout
- proc_stream = Exile.stream!("ffmpeg", ~w(-i - -f mp3 -))
-
- Task.async(fn ->
- Stream.into(stream, proc_stream)
- |> Stream.run()
- end)
-
- proc_stream
- end
-
- File.stream!("music_video.mkv", [], 65535)
- |> audio_stream!()
+ Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
"""
- def stream!(cmd, args \\ [], opts \\ %{}) do
- Exile.Stream.__build__(cmd, args, opts)
+ def stream!(cmd_with_args, opts \\ []) do
+ Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index 383a318..7dff1be 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,380 +1,379 @@
defmodule Exile.Process do
@moduledoc """
GenServer which wraps spawned external command.
One should use `Exile.stream!` over `Exile.Process`. stream internally manages this server for you. Use this only if you need more control over the life-cycle OS process.
## Overview
`Exile.Process` is an alternative primitive for Port. It has different interface and approach to running external programs to solve the issues associated with the ports.
### When compared to Port
* it is demand driven. User explicitly has to `read` output of the command and the progress of the external command is controlled using OS pipes. so unlike Port, this never cause memory issues in beam by loading more than we can consume
* it can close stdin of the program explicitly
* does not create zombie process. It always tries to cleanup resources
At high level it makes non-blocking asynchronous system calls to execute and interact with the external program. It completely bypasses beam implementation for the same using NIF. It uses `select()` system call for asynchronous IO. Most of the system calls are non-blocking, so it does not has adverse effect on scheduler. Issues such as "scheduler collapse".
### Obligatory NIF warning
As with any NIF based solution, bugs or issues in Exile implementation can bring down the beam VM. But NIF implementation is comparatively small and mostly uses POSIX system calls, spawned external processes are still completely isolated at OS level and the port issues it tries to solve are critical.
"""
alias Exile.ProcessNif
require Logger
use GenServer
defmacro fork_exec_failure(), do: 125
# delay between exit_check when io is busy (in milliseconds)
- @default_opts %{io_exit_check_delay: 1, stderr_to_console: false}
+ @default_opts [io_exit_check_delay: 1, stderr_to_console: false]
- def start_link(cmd, args, opts \\ %{}) do
- opts = Map.merge(@default_opts, opts)
- GenServer.start(__MODULE__, %{cmd: cmd, args: args, opts: opts})
+ def start_link(cmd_with_args, opts \\ []) do
+ opts = Keyword.merge(@default_opts, opts)
+ GenServer.start(__MODULE__, %{cmd_with_args: cmd_with_args, opts: opts})
end
def close_stdin(process) do
GenServer.call(process, :close_stdin, :infinity)
end
def write(process, iodata) do
GenServer.call(process, {:write, IO.iodata_to_binary(iodata)}, :infinity)
end
def read(process, size) when is_integer(size) or size == :unbuffered do
GenServer.call(process, {:read, size}, :infinity)
end
def read(process) do
GenServer.call(process, {:read, :unbuffered}, :infinity)
end
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process, {:kill, signal}, :infinity)
end
def await_exit(process, timeout \\ :infinity) do
GenServer.call(process, {:await_exit, timeout}, :infinity)
end
def os_pid(process) do
GenServer.call(process, :os_pid, :infinity)
end
def stop(process), do: GenServer.call(process, :stop, :infinity)
## Server
defmodule Pending do
defstruct bin: [], remaining: 0, client_pid: nil
end
defstruct [
- :cmd,
- :cmd_args,
+ :cmd_with_args,
:opts,
:errno,
:context,
:status,
await: %{},
pending_read: nil,
pending_write: nil
]
alias __MODULE__
- def init(%{cmd: cmd, args: args, opts: opts}) do
- path = :os.find_executable(to_charlist(cmd))
+ def init(args) do
+ %{cmd_with_args: [cmd | args], opts: opts} = args
+ path = System.find_executable(cmd)
unless path do
raise "Command not found: #{cmd}"
end
state = %__MODULE__{
- cmd: path,
- cmd_args: args,
+ cmd_with_args: [path | args],
opts: opts,
errno: nil,
status: :init,
await: %{},
pending_read: %Pending{},
pending_write: %Pending{}
}
{:ok, state, {:continue, nil}}
end
def handle_continue(nil, state) do
- exec_args = Enum.map(state.cmd_args, &to_charlist/1)
- stderr_to_console = if state.opts.stderr_to_console, do: 1, else: 0
+ exec_args = Enum.map(state.cmd_with_args, &to_charlist/1)
+ stderr_to_console = if state.opts[:stderr_to_console], do: 1, else: 0
- case ProcessNif.execute([state.cmd | exec_args], stderr_to_console) do
+ case ProcessNif.execute(exec_args, stderr_to_console) do
{:ok, context} ->
start_watcher(context)
{:noreply, %Process{state | context: context, status: :start}}
{:error, errno} ->
- raise "Failed to start command: #{state.cmd}, errno: #{errno}"
+ raise "Failed to start command: #{state.cmd_with_args}, errno: #{errno}"
end
end
def handle_call(:stop, _from, state) do
# watcher will take care of termination of external process
# TODO: pending write and read should receive "stopped" return
# value instead of exit signal
{:stop, :normal, :ok, state}
end
def handle_call(_, _from, %{status: {:exit, status}}), do: {:reply, {:error, {:exit, status}}}
def handle_call({:await_exit, timeout}, from, state) do
tref =
if timeout != :infinity do
Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
else
nil
end
state = put_timer(state, from, :timeout, tref)
check_exit(state, from)
end
def handle_call({:write, binary}, from, state) when is_binary(binary) do
pending = %Pending{bin: binary, client_pid: from}
do_write(%Process{state | pending_write: pending})
end
def handle_call({:read, size}, from, state) do
pending = %Pending{remaining: size, client_pid: from}
do_read(%Process{state | pending_read: pending})
end
def handle_call(:close_stdin, _from, state), do: do_close(state, :stdin)
def handle_call(:os_pid, _from, state), do: {:reply, ProcessNif.os_pid(state.context), state}
def handle_call({:kill, signal}, _from, state) do
do_kill(state.context, signal)
{:reply, :ok, %{state | status: {:exit, :killed}}}
end
def handle_info({:check_exit, from}, state), do: check_exit(state, from)
def handle_info({:await_exit_timeout, from}, state) do
cancel_timer(state, from, :check)
receive do
{:check_exit, ^from} -> :ok
after
0 -> :ok
end
GenServer.reply(from, :timeout)
{:noreply, clear_await(state, from)}
end
def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
def handle_info({:select, _read_resource, _ref, :ready_input}, state), do: do_read(state)
def handle_info(msg, _state), do: raise(msg)
defp do_write(%Process{pending_write: %Pending{bin: <<>>}} = state) do
GenServer.reply(state.pending_write.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
defp do_write(%Process{pending_write: pending} = state) do
case ProcessNif.sys_write(state.context, pending.bin) do
{:ok, size} ->
if size < byte_size(pending.bin) do
binary = binary_part(pending.bin, size, byte_size(pending.bin) - size)
{:noreply, %{state | pending_write: %Pending{pending | bin: binary}}}
else
GenServer.reply(pending.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: %Pending{remaining: :unbuffered} = pending} = state) do
case ProcessNif.sys_read(state.context, -1) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, []})
{:noreply, state}
{:ok, binary} ->
GenServer.reply(pending.client_pid, {:ok, binary})
{:noreply, state}
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: pending} = state) do
case ProcessNif.sys_read(state.context, pending.remaining) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, pending.bin})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
if byte_size(binary) < pending.remaining do
pending = %Pending{
pending
| bin: [pending.bin | binary],
remaining: pending.remaining - byte_size(binary)
}
{:noreply, %Process{state | pending_read: pending}}
else
GenServer.reply(pending.client_pid, {:ok, [state.pending_read.bin | binary]})
{:noreply, %Process{state | pending_read: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | pending_read: %Pending{}, errno: errno}}
end
end
defp check_exit(state, from) do
case ProcessNif.sys_wait(state.context) do
{:ok, {:exit, fork_exec_failure()}} ->
GenServer.reply(from, {:error, :failed_to_execute})
cancel_timer(state, from, :timeout)
{:noreply, clear_await(state, from)}
{:ok, status} ->
GenServer.reply(from, {:ok, status})
cancel_timer(state, from, :timeout)
{:noreply, clear_await(state, from)}
{:error, {0, _}} ->
# Ideally we should not poll and we should handle this with SIGCHLD signal
tref =
- Elixir.Process.send_after(self(), {:check_exit, from}, state.opts.io_exit_check_delay)
+ Elixir.Process.send_after(self(), {:check_exit, from}, state.opts[:io_exit_check_delay])
{:noreply, put_timer(state, from, :check, tref)}
{:error, {-1, status}} ->
GenServer.reply(from, {:error, status})
cancel_timer(state, from, :timeout)
{:noreply, clear_await(state, from)}
end
end
defp do_kill(context, :sigkill), do: ProcessNif.sys_kill(context)
defp do_kill(context, :sigterm), do: ProcessNif.sys_terminate(context)
defp do_close(state, type) do
case ProcessNif.sys_close(state.context, stream_type(type)) do
:ok ->
{:reply, :ok, state}
{:error, errno} ->
raise errno
{:reply, {:error, errno}, %Process{state | errno: errno}}
end
end
defp clear_await(state, from) do
%Process{state | await: Map.delete(state.await, from)}
end
defp cancel_timer(state, from, key) do
case get_timer(state, from, key) do
nil -> :ok
tref -> Elixir.Process.cancel_timer(tref)
end
end
defp put_timer(state, from, key, timer) do
if Map.has_key?(state.await, from) do
await = put_in(state.await, [from, key], timer)
%Process{state | await: await}
else
%Process{state | await: %{from => %{key => timer}}}
end
end
defp get_timer(state, from, key), do: get_in(state.await, [from, key])
- # Try to gracefully terminate external proccess if the genserver associated with the process is killed
+ # Try to gracefully terminate external process if the genserver associated with the process is killed
defp start_watcher(context) do
process_server = self()
watcher_pid = spawn(fn -> watcher(process_server, context) end)
receive do
{^watcher_pid, :done} -> :ok
end
end
defp stream_type(:stdin), do: 0
defp stream_type(:stdout), do: 1
defp process_exit?(context) do
match?({:ok, _}, ProcessNif.sys_wait(context))
end
defp process_exit?(context, timeout) do
if process_exit?(context) do
true
else
:timer.sleep(timeout)
process_exit?(context)
end
end
# for proper process exit parent of the child *must* wait() for
# child processes termination exit and "pickup" after the exit
# (receive child exit_status). Resources acquired by child such as
# file descriptors won't be released even if the child process
# itself is terminated.
defp watcher(process_server, context) do
ref = Elixir.Process.monitor(process_server)
send(process_server, {self(), :done})
receive do
{:DOWN, ^ref, :process, ^process_server, _reason} ->
try do
Logger.debug(fn -> "Stopping external program" end)
# sys_close is idempotent, calling it multiple times is okay
ProcessNif.sys_close(context, stream_type(:stdin))
ProcessNif.sys_close(context, stream_type(:stdout))
# at max we wait for 100ms for program to exit
process_exit?(context, 100) && throw(:done)
Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
ProcessNif.sys_terminate(context)
process_exit?(context, 100) && throw(:done)
Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
ProcessNif.sys_kill(context)
process_exit?(context, 1000) && throw(:done)
Logger.error("[exile] failed to kill external process")
raise "Failed to kill external process"
catch
:done -> Logger.debug(fn -> "External program exited successfully" end)
end
end
end
end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index 68a0655..79e01ed 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,102 +1,132 @@
defmodule Exile.Stream do
@moduledoc """
Defines a `Exile.Stream` struct returned by `Exile.stream!/3`.
"""
alias Exile.Process
- defstruct [:proc_server, :stream_opts]
+ defmodule Sink do
+ defstruct [:process]
- @default_opts %{exit_timeout: :infinity, chunk_size: 65535}
+ defimpl Collectable do
+ def into(%{process: process} = stream) do
+ collector_fun = fn
+ :ok, {:cont, x} ->
+ :ok = Process.write(process, x)
+
+ :ok, :done ->
+ :ok = Process.close_stdin(process)
+ stream
+
+ :ok, :halt ->
+ :ok = Process.close_stdin(process)
+ end
+
+ {:ok, collector_fun}
+ end
+ end
+ end
+
+ defstruct [:process, :stream_opts]
+
+ @default_opts [exit_timeout: :infinity, chunk_size: 65535]
@type t :: %__MODULE__{}
@doc false
- def __build__(cmd, args, opts) do
- opts = Map.merge(@default_opts, opts)
- {stream_opts, proc_opts} = Map.split(opts, [:exit_timeout, :chunk_size])
- {:ok, proc} = Process.start_link(cmd, args, proc_opts)
- %Exile.Stream{proc_server: proc, stream_opts: stream_opts}
- end
+ def __build__(cmd_with_args, opts) do
+ {stream_opts, process_opts} = Keyword.split(opts, [:exit_timeout, :chunk_size, :input])
+ stream_opts = Keyword.merge(@default_opts, stream_opts)
- defimpl Collectable do
- def into(%{proc_server: proc} = stream) do
- collector_fun = fn
- :ok, {:cont, x} ->
- :ok = Process.write(proc, x)
+ {:ok, process} = Process.start_link(cmd_with_args, process_opts)
- :ok, :done ->
- :ok = Process.close_stdin(proc)
- stream
+ start_input_streamer(%Sink{process: process}, stream_opts[:input])
- :ok, :halt ->
- :ok = Process.close_stdin(proc)
- end
+ %Exile.Stream{process: process, stream_opts: stream_opts}
+ end
- {:ok, collector_fun}
+ @doc false
+ defp start_input_streamer(sink, input) do
+ cond do
+ is_nil(input) ->
+ :ok
+
+ !is_function(input) && Enumerable.impl_for(input) ->
+ spawn_link(fn ->
+ Enum.into(input, sink)
+ end)
+
+ is_function(input, 1) ->
+ spawn_link(fn ->
+ input.(sink)
+ end)
+
+ true ->
+ raise ArgumentError,
+ message: ":input must be either Enumerable or a function with arity 1"
end
end
defimpl Enumerable do
- def reduce(%{proc_server: proc, stream_opts: stream_opts}, acc, fun) do
+ def reduce(%{process: process, stream_opts: stream_opts}, acc, fun) do
start_fun = fn -> :ok end
next_fun = fn :ok ->
- case Process.read(proc, stream_opts.chunk_size) do
+ case Process.read(process, stream_opts[:chunk_size]) do
{:eof, []} ->
{:halt, :normal}
{:eof, x} ->
# multiple reads on closed pipe always returns :eof
- {[x], :ok}
+ {[IO.iodata_to_binary(x)], :ok}
{:ok, x} ->
- {[x], :ok}
+ {[IO.iodata_to_binary(x)], :ok}
{:error, errno} ->
raise "Failed to read from the process. errno: #{errno}"
end
end
after_fun = fn exit_type ->
try do
# always close stdin before stoping to give the command chance to exit properly
- Process.close_stdin(proc)
- result = Process.await_exit(proc, stream_opts.exit_timeout)
+ Process.close_stdin(process)
+ result = Process.await_exit(process, stream_opts[:exit_timeout])
case {exit_type, result} do
{_, :timeout} ->
- Process.kill(proc, :sigkill)
- raise "command fail to exit within timeout: #{stream_opts.exit_timeout}"
+ Process.kill(process, :sigkill)
+ raise "command fail to exit within timeout: #{stream_opts[:exit_timeout]}"
{:normal, {:ok, {:exit, 0}}} ->
:ok
{:normal, {:ok, error}} ->
raise "command exited with status: #{inspect(error)}"
{_, error} ->
- Process.kill(proc, :sigkill)
+ Process.kill(process, :sigkill)
raise "command exited with error: #{inspect(error)}"
end
after
- Process.stop(proc)
+ Process.stop(process)
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
end
end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index 7c99198..e2dbb51 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,281 +1,281 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
alias Exile.Process
test "read" do
- {:ok, s} = Process.start_link("echo", ["test"])
+ {:ok, s} = Process.start_link(~w(echo test))
assert {:eof, iodata} = Process.read(s, 100)
assert IO.iodata_to_binary(iodata) == "test\n"
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
end
test "write" do
- {:ok, s} = Process.start_link("cat", [])
+ {:ok, s} = Process.start_link(~w(cat))
assert :ok == Process.write(s, "hello")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "hello"
assert :ok == Process.write(s, "world")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "world"
assert :ok == Process.close_stdin(s)
assert {:eof, []} == Process.read(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
end
test "stdin close" do
logger = start_events_collector()
# base64 produces output only after getting EOF from stdin. we
# collect events in order and assert that we can still read from
# stdout even after closing stdin
- {:ok, s} = Process.start_link("base64", [])
+ {:ok, s} = Process.start_link(~w(base64))
# parallel reader should be blocked till we close stdin
start_parallel_reader(s, logger)
:timer.sleep(100)
assert :ok == Process.write(s, "hello")
add_event(logger, {:write, "hello"})
assert :ok == Process.write(s, "world")
add_event(logger, {:write, "world"})
:timer.sleep(100)
assert :ok == Process.close_stdin(s)
add_event(logger, :input_close)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 50)
assert [
{:write, "hello"},
{:write, "world"},
:input_close,
{:read, "aGVsbG93b3JsZA==\n"},
:eof
] == get_events(logger)
end
test "external command termination on stop" do
- {:ok, s} = Process.start_link("cat", [])
+ {:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
:timer.sleep(100)
refute os_process_alive?(os_pid)
end
test "external command kill on stop" do
# cat command hangs waiting for EOF
- {:ok, s} = Process.start_link(fixture("ignore_sigterm.sh"), [])
+ {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
if os_process_alive?(os_pid) do
:timer.sleep(3000)
refute os_process_alive?(os_pid)
else
:ok
end
end
test "exit status" do
- {:ok, s} = Process.start_link("sh", ~w(-c "exit 2"))
+ {:ok, s} = Process.start_link(~w(sh -c "exit 2"))
assert {:ok, {:exit, 2}} == Process.await_exit(s, 500)
end
test "writing binary larger than pipe buffer size" do
large_bin = generate_binary(5 * 65535)
- {:ok, s} = Process.start_link("cat", [])
+ {:ok, s} = Process.start_link(~w(cat))
writer =
Task.async(fn ->
Process.write(s, large_bin)
Process.close_stdin(s)
end)
:timer.sleep(100)
{_, iodata} = Process.read(s, 5 * 65535)
Task.await(writer)
assert IO.iodata_length(iodata) == 5 * 65535
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
end
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
- {:ok, s} = Process.start_link("cat", [])
+ {:ok, s} = Process.start_link(~w(cat))
large_bin = generate_binary(65535 * 5)
writer =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.write(s, large_bin)
add_event(logger, {:write, i})
end)
Process.close_stdin(s)
end)
:timer.sleep(50)
reader =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.read(s, 5 * 65535)
add_event(logger, {:read, i})
# delay in reading should delay writes
:timer.sleep(10)
end)
end)
Task.await(writer)
Task.await(reader)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
assert [
write: 1,
read: 1,
write: 2,
read: 2,
write: 3,
read: 3,
write: 4,
read: 4,
write: 5,
read: 5,
write: 6,
read: 6,
write: 7,
read: 7,
write: 8,
read: 8,
write: 9,
read: 9,
write: 10,
read: 10
] == get_events(logger)
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
- {:ok, s} = Process.start_link("sleep", ["60"])
+ {:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
Process.stop(s)
open_files = parse_lsof(bin)
assert [%{fd: "0", name: _, type: "PIPE"}, %{type: "PIPE", fd: "1", name: _}] = open_files
end
test "process kill with pending write" do
- {:ok, s} = Process.start_link("cat", [])
+ {:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
large_data =
Stream.cycle(["test"]) |> Stream.take(500_000) |> Enum.to_list() |> IO.iodata_to_binary()
task =
Task.async(fn ->
try do
Process.write(s, large_data)
catch
:exit, reason -> reason
end
end)
:timer.sleep(200)
Process.stop(s)
:timer.sleep(3000)
refute os_process_alive?(os_pid)
assert {:normal, _} = Task.await(task)
end
def start_parallel_reader(proc_server, logger) do
spawn_link(fn -> reader_loop(proc_server, logger) end)
end
def reader_loop(proc_server, logger) do
case Process.read(proc_server) do
{:ok, data} ->
add_event(logger, {:read, data})
reader_loop(proc_server, logger)
{:eof, []} ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
Stream.repeatedly(fn -> "A" end) |> Enum.take(size) |> IO.iodata_to_binary()
end
end
diff --git a/test/exile/sync_process_test.exs b/test/exile/sync_process_test.exs
index 784f4a6..cc53806 100644
--- a/test/exile/sync_process_test.exs
+++ b/test/exile/sync_process_test.exs
@@ -1,26 +1,26 @@
defmodule Exile.SyncProcessTest do
use ExUnit.Case, async: false
alias Exile.Process
@bin Stream.repeatedly(fn -> "A" end) |> Enum.take(65535) |> IO.iodata_to_binary()
test "memory leak" do
:timer.sleep(1000)
before_exec = :erlang.memory(:total)
- {:ok, s} = Process.start_link("cat", [])
+ {:ok, s} = Process.start_link(~w(cat))
Enum.each(1..500, fn _ ->
:ok = Process.write(s, @bin)
{:ok, _} = Process.read(s, 65535)
end)
:timer.sleep(1000)
after_exec = :erlang.memory(:total)
assert_in_delta before_exec, after_exec, 1024 * 1024
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
end
end
diff --git a/test/exile_test.exs b/test/exile_test.exs
index 4577bc5..3fdae15 100644
--- a/test/exile_test.exs
+++ b/test/exile_test.exs
@@ -1,27 +1,23 @@
defmodule ExileTest do
use ExUnit.Case
- test "stream" do
- str = "hello"
-
- proc_stream = Exile.stream!("cat")
-
- Task.async(fn ->
- Stream.map(1..1000, fn _ -> str end)
- |> Enum.into(proc_stream)
- end)
+ test "stream with enumerable" do
+ proc_stream = Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end))
+ output = proc_stream |> Enum.to_list()
+ assert IO.iodata_length(output) == 1000
+ end
- output =
- proc_stream
- |> Enum.to_list()
- |> IO.iodata_to_binary()
+ test "stream with collectable" do
+ proc_stream =
+ Exile.stream!(["cat"], input: fn sink -> Enum.into(1..1000, sink, fn _ -> "a" end) end)
- assert IO.iodata_length(output) == 1000 * String.length(str)
+ output = proc_stream |> Enum.to_list()
+ assert IO.iodata_length(output) == 1000
end
test "stream without stdin" do
- proc_stream = Exile.stream!("echo", ["hello"])
+ proc_stream = Exile.stream!(~w(echo hello))
output = proc_stream |> Enum.to_list()
assert IO.iodata_to_binary(output) == "hello\n"
end
end

File Metadata

Mime Type
text/x-diff
Expires
Fri, Nov 29, 12:40 AM (1 d, 13 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
41124
Default Alt Text
(61 KB)

Event Timeline