Page MenuHomePhorge

No OneTemporary

Size
79 KB
Referenced Files
None
Subscribers
None
diff --git a/README.md b/README.md
index bcb41b6..c573208 100644
--- a/README.md
+++ b/README.md
@@ -1,77 +1,78 @@
# Exile
Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix ports issues.
Exile is built around the idea of having demand-driven, asynchronous interaction with external process. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
``` elixir
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
`Exile.stream!` is a convenience wrapper around `Exile.Process`. If you want more control over stdin, stdout, and os process use `Exile.Process` directly.
Exile requires OTP v22.1 and above.
**Exile is experimental and it is still work-in-progress. Exile is based on NIF, please know the implications of it before using it**
## Rationale
Existing approaches
#### Port
Port is the default way of executing external commands. This is okay when you have control over the external program's implementation and the interaction is minimal. Port has several important issues.
* it can end up creating [zombie process](https://hexdocs.pm/elixir/Port.html#module-zombie-operating-system-processes)
* cannot selectively close stdin. This is required when the external programs act on EOF from stdin
* it sends command output as a message to the beam process. This does not put back pressure on the external program and leads exhausting VM memory
#### Middleware based solutions
Libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. solves the first two issues associated with ports - zombie process and selectively closing STDIN. But not the third issue - having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
* no back-pressure
* additional os process (middleware) for every execution of your program
* in few cases such as porcelain user has to install this external program explicitly
* might not be suitable when the program requires constant communication between beam process and external program
On the plus side, unlike Exile, bugs in the implementation does not bring down whole beam VM.
#### [ExCmd](https://github.com/akash-akya/ex_cmd)
This is my other stab at solving back pressure on the external program issue. It implements a demand-driven protocol using [odu](https://github.com/akash-akya/odu) to solve this. Since ExCmd is also a port based solution, concerns previously mentioned applies to ExCmd too.
## Exile
Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication instead does raw stdio using NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Makes use of dirty-schedulers for IO.
**Highlights**
* Back pressure
* it does not use any middleware program
* no additional os process. no performance/resource cost
* no need to install any external command
* tries to handle zombie process by attempting to cleanup external process. *But* as there is no middleware involved with exile so it is still possbile to endup with zombie process
* stream abstraction
+* selectively consume stdout and stderr
If you are running executing huge number of external programs **concurrently** (more than few hundred) you might have to increase open file descriptors limit (`ulimit -n`)
Non-blocking io can be used for other interesting things. Such as reading named pipe (FIFO) files. `Exile.stream!(~w(cat data.pipe))` does not block schedulers so you can open hundreds of fifo files unlike default `file` based io.
#### TODO
* add benchmarks results
### 🚨 Obligatory NIF warning
As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls. Also, spawned external processes are still completely isolated at OS level.
If all you want is to run a command with no communication, then just sticking with `System.cmd` is a better option.
### License
Copyright (c) 2020 Akash Hiremath.
Exile source code is released under Apache License 2.0. Check [LICENSE](LICENSE.md) for more information.
diff --git a/c_src/exile.c b/c_src/exile.c
index c4b4f14..213b0b4 100644
--- a/c_src/exile.c
+++ b/c_src/exile.c
@@ -1,394 +1,383 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "erl_nif.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#ifdef ERTS_DIRTY_SCHEDULERS
#define USE_DIRTY_IO ERL_NIF_DIRTY_JOB_IO_BOUND
#else
#define USE_DIRTY_IO 0
#endif
// #define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
enif_fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, \
__func__); \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
#else
#define debug(...)
#endif
#define error(...) \
do { \
enif_fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, \
__func__); \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
#define assert_argc(argc, count) \
if (argc != count) { \
error("number of arguments must be %d", count); \
return enif_make_badarg(env); \
}
static const int UNBUFFERED_READ = -1;
static const int PIPE_BUF_SIZE = 65535;
static const int FD_CLOSED = -1;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_UNDEFINED;
static ERL_NIF_TERM ATOM_INVALID_FD;
static ERL_NIF_TERM ATOM_SELECT_CANCEL_ERROR;
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_SIGKILL;
static ERL_NIF_TERM ATOM_SIGTERM;
static void close_fd(int *fd) {
if (*fd != FD_CLOSED) {
close(*fd);
*fd = FD_CLOSED;
}
}
static int cancel_select(ErlNifEnv *env, int *fd) {
int ret;
if (*fd != FD_CLOSED) {
ret = enif_select(env, *fd, ERL_NIF_SELECT_STOP, fd, NULL, ATOM_UNDEFINED);
if (ret < 0)
perror("cancel_select()");
return ret;
}
return 0;
}
static void io_resource_dtor(ErlNifEnv *env, void *obj) {
debug("Exile io_resource_dtor called");
}
static void io_resource_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
close_fd(&fd);
debug("Exile io_resource_stop called %d", fd);
}
static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
int *fd = (int *)obj;
cancel_select(env, fd);
debug("Exile io_resource_down called");
}
static ErlNifResourceTypeInit io_rt_init;
static ErlNifResourceType *FD_RT;
static inline ERL_NIF_TERM make_ok(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_OK, term);
}
static inline ERL_NIF_TERM make_error(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_ERROR, term);
}
/* time is assumed to be in microseconds */
static void notify_consumed_timeslice(ErlNifEnv *env, ErlNifTime start,
ErlNifTime stop) {
ErlNifTime pct;
pct = (ErlNifTime)((stop - start) / 10);
if (pct > 100)
pct = 100;
else if (pct == 0)
pct = 1;
enif_consume_timeslice(env, pct);
}
static int select_write(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_WRITE, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_write()");
return ret;
}
static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 2);
ErlNifTime start;
ssize_t size;
ErlNifBinary bin;
int write_errno;
int *fd;
start = enif_monotonic_time(ERL_NIF_USEC);
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (enif_inspect_binary(env, argv[1], &bin) != true)
return enif_make_badarg(env);
if (bin.size == 0)
return enif_make_badarg(env);
/* should we limit the bin.size here? */
size = write(*fd, bin.data, bin.size);
write_errno = errno;
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (size >= (ssize_t)bin.size) { // request completely satisfied
return make_ok(env, enif_make_int(env, size));
} else if (size >= 0) { // request partially satisfied
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, enif_make_int(env, size));
} else if (write_errno == EAGAIN || write_errno == EWOULDBLOCK) { // busy
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else {
perror("write()");
return make_error(env, enif_make_int(env, write_errno));
}
}
static int select_read(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_READ, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_read()");
return ret;
}
static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 1);
ERL_NIF_TERM term;
ErlNifPid pid;
int *fd;
int ret;
fd = enif_alloc_resource(FD_RT, sizeof(int));
if (!enif_get_int(env, argv[0], fd))
goto error_exit;
if (!enif_self(env, &pid)) {
error("failed get self pid");
goto error_exit;
}
ret = enif_monitor_process(env, fd, &pid, NULL);
if (ret < 0) {
error("no down callback is provided");
goto error_exit;
} else if (ret > 0) {
error("pid is not alive");
goto error_exit;
}
term = enif_make_resource(env, fd);
enif_release_resource(fd);
return make_ok(env, term);
error_exit:
enif_release_resource(fd);
return ATOM_ERROR;
}
-static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- assert_argc(argc, 2);
-
- ErlNifTime start;
- int size, demand;
- int *fd;
-
- start = enif_monotonic_time(ERL_NIF_USEC);
-
- if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
- return make_error(env, ATOM_INVALID_FD);
-
- if (!enif_get_int(env, argv[1], &demand))
+static ERL_NIF_TERM read_fd(ErlNifEnv *env, int *fd, int max_size) {
+ if (max_size == UNBUFFERED_READ) {
+ max_size = PIPE_BUF_SIZE;
+ } else if (max_size < 1) {
return enif_make_badarg(env);
-
- size = demand;
-
- if (demand == UNBUFFERED_READ) {
- size = PIPE_BUF_SIZE;
- } else if (demand < 1) {
- return enif_make_badarg(env);
- } else if (demand > PIPE_BUF_SIZE) {
- size = PIPE_BUF_SIZE;
+ } else if (max_size > PIPE_BUF_SIZE) {
+ max_size = PIPE_BUF_SIZE;
}
- unsigned char buf[size];
- ssize_t result = read(*fd, buf, size);
+ ErlNifTime start = enif_monotonic_time(ERL_NIF_USEC);
+ unsigned char buf[max_size];
+ ssize_t result = read(*fd, buf, max_size);
int read_errno = errno;
ERL_NIF_TERM bin_term = 0;
if (result >= 0) {
/* no need to release this binary */
unsigned char *temp = enif_make_new_binary(env, result, &bin_term);
memcpy(temp, buf, result);
}
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (result >= 0) {
- /* we do not 'select' if demand completely satisfied OR EOF OR its
- * UNBUFFERED_READ */
- if (result == demand || result == 0 || demand == UNBUFFERED_READ) {
- return make_ok(env, bin_term);
- } else { // demand partially satisfied
- int retval = select_read(env, fd);
- if (retval != 0)
- return make_error(env, enif_make_int(env, retval));
- return make_ok(env, bin_term);
- }
+ return make_ok(env, bin_term);
+ } else if (read_errno == EAGAIN || read_errno == EWOULDBLOCK) { // busy
+ int retval = select_read(env, fd);
+ if (retval != 0)
+ return make_error(env, enif_make_int(env, retval));
+ return make_error(env, ATOM_EAGAIN);
} else {
- if (read_errno == EAGAIN || read_errno == EWOULDBLOCK) { // busy
- int retval = select_read(env, fd);
- if (retval != 0)
- return make_error(env, enif_make_int(env, retval));
- return make_error(env, ATOM_EAGAIN);
- } else {
- perror("read()");
- return make_error(env, enif_make_int(env, read_errno));
- }
+ perror("read_fd()");
+ return make_error(env, enif_make_int(env, read_errno));
}
}
+static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc,
+ const ERL_NIF_TERM argv[]) {
+ assert_argc(argc, 2);
+
+ int max_size;
+ int *fd;
+
+ if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
+ return make_error(env, ATOM_INVALID_FD);
+
+ if (!enif_get_int(env, argv[1], &max_size))
+ return enif_make_badarg(env);
+
+ return read_fd(env, fd, max_size);
+}
+
static ERL_NIF_TERM nif_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 1);
int *fd;
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (cancel_select(env, fd) < 0)
return make_error(env, ATOM_SELECT_CANCEL_ERROR);
close_fd(fd);
return ATOM_OK;
}
static ERL_NIF_TERM nif_is_os_pid_alive(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 1);
pid_t pid;
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
int result = kill(pid, 0);
if (result == 0)
return ATOM_TRUE;
else
return ATOM_FALSE;
}
static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
assert_argc(argc, 2);
pid_t pid;
int ret;
// we should not assume pid type to be `int`?
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
if (enif_compare(argv[1], ATOM_SIGKILL) == 0)
ret = kill(pid, SIGKILL);
else if (enif_compare(argv[1], ATOM_SIGTERM) == 0)
ret = kill(pid, SIGTERM);
else
return enif_make_badarg(env);
if (ret != 0) {
perror("[exile] failed to send signal");
return make_error(
env, enif_make_string(env, "failed to send signal", ERL_NIF_LATIN1));
}
return ATOM_OK;
}
static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
io_rt_init.dtor = io_resource_dtor;
io_rt_init.stop = io_resource_stop;
io_rt_init.down = io_resource_down;
FD_RT =
enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_FALSE = enif_make_atom(env, "false");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
ATOM_INVALID_FD = enif_make_atom(env, "invalid_fd_resource");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_SELECT_CANCEL_ERROR = enif_make_atom(env, "select_cancel_error");
ATOM_SIGTERM = enif_make_atom(env, "sigterm");
ATOM_SIGKILL = enif_make_atom(env, "sigkill");
return 0;
}
static void on_unload(ErlNifEnv *env, void *priv) {
debug("exile unload");
enif_free(priv);
}
static ErlNifFunc nif_funcs[] = {
{"nif_read", 2, nif_read, USE_DIRTY_IO},
{"nif_create_fd", 1, nif_create_fd, USE_DIRTY_IO},
{"nif_write", 2, nif_write, USE_DIRTY_IO},
{"nif_close", 1, nif_close, USE_DIRTY_IO},
{"nif_is_os_pid_alive", 1, nif_is_os_pid_alive, USE_DIRTY_IO},
{"nif_kill", 2, nif_kill, USE_DIRTY_IO}};
ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
diff --git a/c_src/spawner.c b/c_src/spawner.c
index 23e6700..3d4db90 100644
--- a/c_src/spawner.c
+++ b/c_src/spawner.c
@@ -1,224 +1,256 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include <fcntl.h>
+#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <unistd.h>
// these definitions are Linux only at the moment
#ifndef CMSG_LEN
socklen_t CMSG_LEN(size_t len) {
return (CMSG_DATA((struct cmsghdr *)NULL) - (unsigned char *)NULL) + len;
}
#endif
#ifndef CMSG_SPACE
socklen_t CMSG_SPACE(size_t len) {
struct msghdr msg;
struct cmsghdr cmsg;
msg.msg_control = &cmsg;
msg.msg_controllen =
~0; /* To maximize the chance that CMSG_NXTHDR won't return NULL */
cmsg.cmsg_len = CMSG_LEN(len);
return (unsigned char *)CMSG_NXTHDR(&msg, &cmsg) - (unsigned char *)&cmsg;
}
#endif
// #define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, __func__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
} while (0)
#else
#define debug(...)
#endif
#define error(...) \
do { \
fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, __func__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
} while (0)
static const int PIPE_READ = 0;
static const int PIPE_WRITE = 1;
/* We are choosing an exit code which is not reserved see:
* https://www.tldp.org/LDP/abs/html/exitcodes.html. */
static const int FORK_EXEC_FAILURE = 125;
static int set_flag(int fd, int flags) {
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
}
-static int send_io_fds(int socket, int read_fd, int write_fd) {
+static int send_io_fds(int socket, int stdin_fd, int stdout_fd, int stderr_fd) {
struct msghdr msg = {0};
struct cmsghdr *cmsg;
- int fds[2];
- char buf[CMSG_SPACE(2 * sizeof(int))], dup[256];
+ int fds[3];
+ char buf[CMSG_SPACE(3 * sizeof(int))];
+ char dup[256];
struct iovec io;
memset(buf, '\0', sizeof(buf));
io.iov_base = &dup;
io.iov_len = sizeof(dup);
msg.msg_iov = &io;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
- cmsg->cmsg_len = CMSG_LEN(2 * sizeof(int));
+ cmsg->cmsg_len = CMSG_LEN(3 * sizeof(int));
- fds[0] = read_fd;
- fds[1] = write_fd;
+ fds[0] = stdin_fd;
+ fds[1] = stdout_fd;
+ fds[2] = stderr_fd;
- memcpy((int *)CMSG_DATA(cmsg), fds, 2 * sizeof(int));
+ debug("stdout: %d, stderr: %d", stdout_fd, stderr_fd);
+
+ memcpy((int *)CMSG_DATA(cmsg), fds, 3 * sizeof(int));
if (sendmsg(socket, &msg, 0) < 0) {
debug("Failed to send message");
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
-static void close_pipes(int pipes[2][2]) {
- for (int i = 0; i < 2; i++) {
+static void close_pipes(int pipes[3][2]) {
+ for (int i = 0; i < 3; i++) {
if (pipes[i][PIPE_READ] > 0)
close(pipes[i][PIPE_READ]);
if (pipes[i][PIPE_WRITE] > 0)
close(pipes[i][PIPE_WRITE]);
}
}
-static int exec_process(char const *bin, char *const *args, int socket) {
- int pipes[2][2] = {{0, 0}, {0, 0}};
- int r_cmdin, w_cmdin, r_cmdout, w_cmdout;
+static int exec_process(char const *bin, char *const *args, int socket,
+ bool use_stderr) {
+ int pipes[3][2] = {{0, 0}, {0, 0}, {0, 0}};
+ int r_cmdin, w_cmdin, r_cmdout, w_cmdout, r_cmderr, w_cmderr;
int i;
- if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1) {
+ if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1 ||
+ pipe(pipes[STDERR_FILENO]) == -1) {
perror("[spawner] failed to create pipes");
close_pipes(pipes);
return 1;
}
debug("created pipes");
r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
+ r_cmderr = pipes[STDERR_FILENO][PIPE_READ];
+ w_cmderr = pipes[STDERR_FILENO][PIPE_WRITE];
+
if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
+ set_flag(w_cmderr, O_CLOEXEC) < 0 ||
set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
- set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0) {
+ set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0 ||
+ set_flag(r_cmderr, O_CLOEXEC | O_NONBLOCK) < 0) {
perror("[spawner] failed to set flags for pipes");
close_pipes(pipes);
return 1;
}
debug("set fd flags to pipes");
- if (send_io_fds(socket, w_cmdin, r_cmdout) != EXIT_SUCCESS) {
+ if (send_io_fds(socket, w_cmdin, r_cmdout, r_cmderr) != EXIT_SUCCESS) {
perror("[spawner] failed to send fd via socket");
close_pipes(pipes);
return 1;
}
debug("sent fds over UDS");
close(STDIN_FILENO);
close(w_cmdin);
if (dup2(r_cmdin, STDIN_FILENO) < 0) {
perror("[spawner] failed to dup to stdin");
_exit(FORK_EXEC_FAILURE);
}
close(STDOUT_FILENO);
close(r_cmdout);
if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
perror("[spawner] failed to dup to stdout");
_exit(FORK_EXEC_FAILURE);
}
+ if (use_stderr) {
+ close(STDERR_FILENO);
+ close(r_cmderr);
+ if (dup2(w_cmderr, STDERR_FILENO) < 0) {
+ perror("[spawner] failed to dup to stderr");
+ _exit(FORK_EXEC_FAILURE);
+ }
+ } else {
+ close(r_cmderr);
+ close(w_cmderr);
+ }
+
/* Close all non-standard io fds. Not closing STDERR */
for (i = STDERR_FILENO + 1; i < sysconf(_SC_OPEN_MAX); i++)
close(i);
debug("exec %s", bin);
execvp(bin, args);
perror("[spawner] execvp(): failed");
_exit(FORK_EXEC_FAILURE);
}
-static int spawn(const char *socket_path, const char *bin, char *const *args) {
+static int spawn(const char *socket_path, const char *use_stderr_str,
+ const char *bin, char *const *args) {
int socket_fd;
struct sockaddr_un socket_addr;
+ bool use_stderr;
+
+ if (strcmp(use_stderr_str, "true") == 0) {
+ use_stderr = true;
+ } else {
+ use_stderr = false;
+ }
socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (socket_fd == -1) {
debug("Failed to create socket");
return EXIT_FAILURE;
}
debug("created domain socket");
memset(&socket_addr, 0, sizeof(struct sockaddr_un));
socket_addr.sun_family = AF_UNIX;
strncpy(socket_addr.sun_path, socket_path, sizeof(socket_addr.sun_path) - 1);
if (connect(socket_fd, (struct sockaddr *)&socket_addr,
sizeof(struct sockaddr_un)) == -1) {
debug("Failed to connect to socket");
return EXIT_FAILURE;
}
debug("connected to exile");
- if (exec_process(bin, args, socket_fd) != 0)
+ if (exec_process(bin, args, socket_fd, use_stderr) != 0)
return EXIT_FAILURE;
// we should never reach here
return EXIT_SUCCESS;
}
int main(int argc, const char *argv[]) {
int status, i;
const char **exec_argv;
- if (argc < 3) {
- debug("expected at least 2 arguments, passed %d", argc);
+ if (argc < 4) {
+ debug("expected at least 3 arguments, passed %d", argc);
status = EXIT_FAILURE;
} else {
- exec_argv = malloc((argc - 2 + 1) * sizeof(char *));
+ exec_argv = malloc((argc - 3 + 1) * sizeof(char *));
- for (i = 2; i < argc; i++)
- exec_argv[i - 2] = argv[i];
+ for (i = 3; i < argc; i++)
+ exec_argv[i - 3] = argv[i];
- exec_argv[i - 2] = NULL;
+ exec_argv[i - 3] = NULL;
- debug("socket path: %s bin: %s", argv[1], argv[2]);
- status = spawn(argv[1], argv[2], (char *const *)exec_argv);
+ debug("socket path: %s use_stderr: %s bin: %s", argv[1], argv[2], argv[3]);
+ status = spawn(argv[1], argv[2], argv[3], (char *const *)exec_argv);
}
exit(status);
}
diff --git a/lib/exile.ex b/lib/exile.ex
index c86ee36..c0598e6 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,73 +1,93 @@
defmodule Exile do
@moduledoc """
Exile is an alternative for beam ports with back-pressure and non-blocking IO
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
# we use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
@doc """
Runs the given command with arguments and return an Enumerable to read command output.
First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* Enumerable:
```
# List
Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
# Stream
Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
```
* Collectable:
If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
```
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
By defaults no input will be given to the command
* `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
- * `chunk_size` - Size of each iodata chunk emitted by Enumerable stream. When set to `:unbuffered` the output is unbuffered and chunk size will be variable depending on the amount of data availble at that time. Defaults to 65535
+ * `chunk_size` - Maximum size of each iodata chunk emitted by stream. Chunk size will be variable depending on the amount of data availble at that time. Defaults to 65535
+
+ * `use_stderr` - When set to true, stream will contain stderr output along with stdout output. Element of the stream will be of the form `{:stdout, iodata}` or `{:stderr, iodata}` to differentiate different streams. Defaults to false. See example below
+
All other options are passed to `Exile.Process.start_link/3`
### Examples
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
+
+ Stream with stderr
+
+ ```
+ script = \"""
+ for i in {1..10}; do
+ echo "foo ${i}"
+ echo "bar ${i}" >&2
+ done
+ \"""
+
+ Exile.stream!(["sh", "-c", script], use_stderr: true)
+ |> Enum.each(fn {stream, lines} ->
+ String.split(lines, "\\n", trim: true)
+ |> Enum.each(fn line -> IO.puts("\#{stream}: \#{line}") end)
+ end)
+ ```
"""
@type collectable_func() :: (Collectable.t() -> any())
@spec stream!(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
- chunk_size: pos_integer() | :unbuffered
+ max_chunk_size: pos_integer()
) :: ExCmd.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index 35ca40f..837b82a 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,519 +1,671 @@
defmodule Exile.Process do
@moduledoc """
GenServer which wraps spawned external command.
`Exile.stream!/1` should be preferred over this. Use this only if you need more control over the life-cycle of IO streams and OS process.
## Comparison with Port
* it is demand driven. User explicitly has to `read` the command output, and the progress of the external command is controlled using OS pipes. Exile never load more output than we can consume, so we should never experience memory issues
- * it can close stdin while consuming stdout
+ * it can close stdin while consuming output
* tries to handle zombie process by attempting to cleanup external process. Note that there is no middleware involved with exile so it is still possbile to endup with zombie process.
Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication, instead uses raw stdio and NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Make use of dirty-schedulers for IO
"""
+ use GenServer
+
+ alias __MODULE__
alias Exile.ProcessNif, as: Nif
require Logger
- use GenServer
- defmodule Error do
- defexception [:message]
- end
+ defstruct [
+ :args,
+ :errno,
+ :port,
+ :socket_path,
+ :stdin,
+ :stdout,
+ :stderr,
+ :status,
+ :use_stderr,
+ :await,
+ :read_stdout,
+ :read_stderr,
+ :read_any,
+ :write_stdin
+ ]
- alias Exile.Process.Error
+ defmodule Pending do
+ @moduledoc false
+ defstruct bin: [], size: 0, client_pid: nil
+ end
- @default_opts [env: []]
+ @default_opts [env: [], use_stderr: false]
+ @default_buffer_size 65535
@doc """
Starts `Exile.ProcessServer`
Starts external program using `cmd_with_args` with options `opts`
`cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
* `cd` - the directory to run the command in
* `env` - a list of tuples containing environment key-value. These can be accessed in the external program
+ * `use_stderr` - when set to true, exile connects stderr stream for the consumption. Defaults to false. Note that when set to true stderr must be consumed to avoid external program from blocking
"""
@type process :: pid
@spec start_link(nonempty_list(String.t()),
cd: String.t(),
env: [{String.t(), String.t()}]
) :: {:ok, process} | {:error, any()}
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
with {:ok, args} <- normalize_args(cmd_with_args, opts) do
GenServer.start(__MODULE__, args)
end
end
@doc """
Closes external program's input stream
"""
@spec close_stdin(process) :: :ok | {:error, any()}
def close_stdin(process) do
GenServer.call(process, :close_stdin, :infinity)
end
@doc """
Writes iodata `data` to program's input streams
This blocks when the pipe is full
"""
@spec write(process, binary) :: :ok | {:error, any()}
def write(process, iodata) do
- GenServer.call(process, {:write, IO.iodata_to_binary(iodata)}, :infinity)
+ GenServer.call(process, {:write_stdin, IO.iodata_to_binary(iodata)}, :infinity)
end
@doc """
- Return bytes written by the program to output stream.
+ Returns bytes from executed command's stdout stream with maximum size `max_size`.
- This blocks until the programs write and flush the output depending on the `size`
+ Blocks if no bytes are written to stdout stream yet. And returns as soon as bytes are availble
"""
- @spec read(process, pos_integer()) ::
- {:ok, iodata} | {:eof, iodata} | {:error, any()}
- def read(process, size) when (is_integer(size) and size > 0) or size == :unbuffered do
- GenServer.call(process, {:read, size}, :infinity)
+ @spec read(process, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
+ def read(process, max_size \\ @default_buffer_size)
+ when is_integer(max_size) and max_size > 0 do
+ GenServer.call(process, {:read_stdout, max_size}, :infinity)
end
- def read(process) do
- GenServer.call(process, {:read, :unbuffered}, :infinity)
+ @doc """
+ Returns bytes from executed command's stderr stream with maximum size `max_size`.
+
+ Blocks if no bytes are written to stdout stream yet. And returns as soon as bytes are availble
+ """
+ @spec read_stderr(process, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
+ def read_stderr(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
+ GenServer.call(process, {:read_stderr, size}, :infinity)
+ end
+
+ @doc """
+ Returns bytes from either stdout or stderr stream with maximum size `max_size` whichever is availble.
+
+ Blocks if no bytes are written to stdout/stderr stream yet. And returns as soon as bytes are availble
+ """
+ @spec read_any(process, pos_integer()) ::
+ {:ok, {:stdout, iodata}} | {:ok, {:stderr, iodata}} | :eof | {:error, any()}
+ def read_any(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
+ GenServer.call(process, {:read_any, size}, :infinity)
end
@doc """
Sends signal to external program
"""
@spec kill(process, :sigkill | :sigterm) :: :ok
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process, {:kill, signal}, :infinity)
end
@doc """
Waits for the program to terminate.
If the program terminates before timeout, it returns `{:ok, exit_status}` else returns `:timeout`
"""
@spec await_exit(process, timeout: timeout()) :: {:ok, integer()} | :timeout
def await_exit(process, timeout \\ :infinity) do
GenServer.call(process, {:await_exit, timeout}, :infinity)
end
@doc """
Returns os pid of the command
"""
@spec os_pid(process) :: pos_integer()
def os_pid(process) do
GenServer.call(process, :os_pid, :infinity)
end
@doc """
Stops the exile process, external program will be terminated in the background
"""
@spec stop(process) :: :ok
def stop(process), do: GenServer.call(process, :stop, :infinity)
## Server
- defmodule Pending do
- @moduledoc false
- defstruct bin: [], remaining: 0, client_pid: nil
- end
-
- defstruct [
- :args,
- :errno,
- :port,
- :socket_path,
- :stdin,
- :stdout,
- :context,
- :status,
- await: %{},
- pending_read: nil,
- pending_write: nil
- ]
-
- alias __MODULE__
-
def init(args) do
+ {use_stderr, args} = Map.pop(args, :use_stderr)
+
state = %__MODULE__{
args: args,
errno: nil,
status: :init,
await: %{},
- pending_read: %Pending{},
- pending_write: %Pending{}
+ use_stderr: use_stderr,
+ read_stdout: %Pending{},
+ read_stderr: %Pending{},
+ read_any: %Pending{},
+ write_stdin: %Pending{}
}
{:ok, state, {:continue, nil}}
end
def handle_continue(nil, state) do
Elixir.Process.flag(:trap_exit, true)
{:noreply, start_process(state)}
end
def handle_call(:stop, _from, state) do
# TODO: pending write and read should receive "stopped" return
# value instead of exit signal
case state.status do
- {:exit, _} -> :ok
- _ -> Port.close(state.port)
+ {:exit, _} ->
+ :ok
+
+ _ ->
+ Port.close(state.port)
end
{:stop, :normal, :ok, state}
end
def handle_call(:close_stdin, _from, state) do
case state.status do
{:exit, _} -> {:reply, :ok, state}
_ -> do_close(state, :stdin)
end
end
def handle_call({:await_exit, _}, _from, %{status: {:exit, status}} = state) do
{:reply, {:ok, {:exit, status}}, state}
end
def handle_call({:await_exit, timeout}, from, %{status: :start} = state) do
tref =
if timeout != :infinity do
Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
else
nil
end
{:noreply, %Process{state | await: Map.put(state.await, from, tref)}}
end
- def handle_call({:read, size}, from, state) do
- if state.pending_read.client_pid do
- {:reply, {:error, :pending_read}, state}
- else
- pending = %Pending{remaining: size, client_pid: from}
- do_read(%Process{state | pending_read: pending})
+ def handle_call({:read_stdout, size}, from, state) do
+ case can_read?(state, :stdout) do
+ :ok ->
+ pending = %Pending{size: size, client_pid: from}
+ do_read_stdout(%Process{state | read_stdout: pending})
+
+ error ->
+ GenServer.reply(from, error)
+ {:noreply, state}
+ end
+ end
+
+ def handle_call({:read_stderr, size}, from, state) do
+ case can_read?(state, :stderr) do
+ :ok ->
+ pending = %Pending{size: size, client_pid: from}
+ do_read_stderr(%Process{state | read_stderr: pending})
+
+ error ->
+ GenServer.reply(from, error)
+ {:noreply, state}
+ end
+ end
+
+ def handle_call({:read_any, size}, from, state) do
+ case can_read?(state, :any) do
+ :ok ->
+ pending = %Pending{size: size, client_pid: from}
+ do_read_any(%Process{state | read_any: pending})
+
+ error ->
+ GenServer.reply(from, error)
+ {:noreply, state}
end
end
def handle_call(_, _from, %{status: {:exit, status}} = state) do
{:reply, {:error, {:exit, status}}, state}
end
- def handle_call({:write, binary}, from, state) do
+ def handle_call({:write_stdin, binary}, from, state) do
cond do
!is_binary(binary) ->
{:reply, {:error, :not_binary}, state}
- state.pending_write.client_pid ->
- {:reply, {:error, :pending_write}, state}
+ state.write_stdin.client_pid ->
+ {:reply, {:error, :write_stdin}, state}
true ->
pending = %Pending{bin: binary, client_pid: from}
- do_write(%Process{state | pending_write: pending})
+ do_write(%Process{state | write_stdin: pending})
end
end
def handle_call(:os_pid, _from, state) do
case Port.info(state.port, :os_pid) do
{:os_pid, os_pid} ->
{:reply, {:ok, os_pid}, state}
:undefined ->
Logger.debug("Process not alive")
{:reply, :undefined, state}
end
end
def handle_call({:kill, signal}, _from, state) do
{:reply, signal(state.port, signal), state}
end
def handle_info({:await_exit_timeout, from}, state) do
GenServer.reply(from, :timeout)
{:noreply, %Process{state | await: Map.delete(state.await, from)}}
end
def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
- def handle_info({:select, _read_resource, _ref, :ready_input}, state), do: do_read(state)
+ def handle_info({:select, read_resource, _ref, :ready_input}, state) do
+ cond do
+ state.read_any.client_pid ->
+ stream =
+ cond do
+ read_resource == state.stdout -> :stdout
+ read_resource == state.stderr -> :stderr
+ end
+
+ do_read_any(state, stream)
+
+ state.read_stdout.client_pid && read_resource == state.stdout ->
+ do_read_stdout(state)
+
+ state.read_stderr.client_pid && read_resource == state.stderr ->
+ do_read_stderr(state)
+
+ true ->
+ {:noreply, state}
+ end
+ end
def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state),
do: handle_port_exit(exit_status, state)
def handle_info({:EXIT, port, :normal}, %{port: port} = state), do: {:noreply, state}
def handle_info({:EXIT, _, reason}, state), do: {:stop, reason, state}
defp handle_port_exit(exit_status, state) do
Enum.each(state.await, fn {from, tref} ->
GenServer.reply(from, {:ok, {:exit, exit_status}})
if tref do
Elixir.Process.cancel_timer(tref)
end
end)
{:noreply, %Process{state | status: {:exit, exit_status}}, await: %{}}
end
- defp do_write(%Process{pending_write: %Pending{bin: <<>>}} = state) do
- GenServer.reply(state.pending_write.client_pid, :ok)
- {:noreply, %{state | pending_write: %Pending{}}}
+ defmacrop eof, do: {:ok, <<>>}
+ defmacrop eagain, do: {:error, :eagain}
+
+ defp do_write(%Process{write_stdin: %Pending{bin: <<>>}} = state) do
+ reply_action(state, :write_stdin, :ok)
end
- defp do_write(%Process{pending_write: pending} = state) do
+ defp do_write(%Process{write_stdin: pending} = state) do
+ bin_size = byte_size(pending.bin)
+
case Nif.nif_write(state.stdin, pending.bin) do
- {:ok, size} ->
- if size < byte_size(pending.bin) do
- binary = binary_part(pending.bin, size, byte_size(pending.bin) - size)
- {:noreply, %{state | pending_write: %Pending{pending | bin: binary}}}
- else
- GenServer.reply(pending.client_pid, :ok)
- {:noreply, %{state | pending_write: %Pending{}}}
- end
+ {:ok, size} when size < bin_size ->
+ binary = binary_part(pending.bin, size, bin_size - size)
+ noreply_action(%{state | write_stdin: %Pending{pending | bin: binary}})
- {:error, :eagain} ->
- {:noreply, state}
+ {:ok, _size} ->
+ reply_action(state, :write_stdin, :ok)
+
+ eagain() ->
+ noreply_action(state)
{:error, errno} ->
- GenServer.reply(pending.client_pid, {:error, errno})
- {:noreply, %{state | errno: errno}}
+ reply_action(%Process{state | errno: errno}, :write_stdin, {:error, errno})
end
end
- defp do_read(%Process{pending_read: %Pending{remaining: :unbuffered} = pending} = state) do
- case Nif.nif_read(state.stdout, -1) do
- {:ok, <<>>} ->
- GenServer.reply(pending.client_pid, {:eof, []})
- {:noreply, %Process{state | pending_read: %Pending{}}}
+ defp do_read_stdout(%Process{read_stdout: pending} = state) do
+ case Nif.nif_read(state.stdout, pending.size) do
+ eof() ->
+ reply_action(state, :read_stdout, :eof)
{:ok, binary} ->
- GenServer.reply(pending.client_pid, {:ok, binary})
- {:noreply, %Process{state | pending_read: %Pending{}}}
+ reply_action(state, :read_stdout, {:ok, binary})
- {:error, :eagain} ->
- {:noreply, state}
+ eagain() ->
+ noreply_action(state)
{:error, errno} ->
- GenServer.reply(pending.client_pid, {:error, errno})
- {:noreply, %Process{state | pending_read: %Pending{}, errno: errno}}
+ reply_action(%Process{state | errno: errno}, :read_stdout, {:error, errno})
end
end
- defp do_read(%Process{pending_read: pending} = state) do
- case Nif.nif_read(state.stdout, pending.remaining) do
- {:ok, <<>>} ->
- GenServer.reply(pending.client_pid, {:eof, pending.bin})
- {:noreply, %Process{state | pending_read: %Pending{}}}
+ defp do_read_stderr(%Process{read_stderr: pending} = state) do
+ case Nif.nif_read(state.stderr, pending.size) do
+ eof() ->
+ reply_action(state, :read_stderr, :eof)
{:ok, binary} ->
- if byte_size(binary) < pending.remaining do
- pending = %Pending{
- pending
- | bin: [pending.bin | binary],
- remaining: pending.remaining - byte_size(binary)
- }
-
- {:noreply, %Process{state | pending_read: pending}}
- else
- GenServer.reply(pending.client_pid, {:ok, [state.pending_read.bin | binary]})
- {:noreply, %Process{state | pending_read: %Pending{}}}
- end
+ reply_action(state, :read_stderr, {:ok, binary})
- {:error, :eagain} ->
- {:noreply, state}
+ eagain() ->
+ noreply_action(state)
{:error, errno} ->
- GenServer.reply(pending.client_pid, {:error, errno})
- {:noreply, %{state | pending_read: %Pending{}, errno: errno}}
+ reply_action(%Process{state | errno: errno}, :read_stderr, {:error, errno})
end
end
- defp do_close(state, type) do
- fd =
- if type == :stdin do
- state.stdin
- else
- state.stdout
+ defp do_read_any(state, stream_hint \\ :stdout) do
+ %Process{read_any: pending, use_stderr: use_stderr} = state
+
+ other_stream =
+ case stream_hint do
+ :stdout -> :stderr
+ :stderr -> :stdout
end
- case Nif.nif_close(fd) do
- :ok ->
- {:reply, :ok, state}
+ case Nif.nif_read(stream_fd(state, stream_hint), pending.size) do
+ ret when ret in [eof(), eagain()] and use_stderr == true ->
+ case {ret, Nif.nif_read(stream_fd(state, other_stream), pending.size)} do
+ {eof(), eof()} ->
+ reply_action(state, :read_any, :eof)
- {:error, errno} ->
- # FIXME: correct
- raise errno
- {:reply, {:error, errno}, %Process{state | errno: errno}}
- end
- end
+ {_, {:ok, binary}} ->
+ reply_action(state, :read_any, {:ok, {other_stream, binary}})
- defp normalize_cmd([cmd | _]) when is_binary(cmd) do
- path = System.find_executable(cmd)
+ {_, eagain()} ->
+ noreply_action(state)
- if path do
- {:ok, to_charlist(path)}
- else
- {:error, "command not found: #{inspect(cmd)}"}
- end
- end
+ {_, {:error, errno}} ->
+ reply_action(%Process{state | errno: errno}, :read_any, {:error, errno})
+ end
- defp normalize_cmd(_cmd_with_args) do
- {:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
- end
+ eof() ->
+ reply_action(state, :read_any, :eof)
- defp normalize_cmd_args([_ | args]) do
- if is_list(args) && Enum.all?(args, &is_binary/1) do
- {:ok, Enum.map(args, &to_charlist/1)}
- else
- {:error, "command arguments must be list of strings. #{inspect(args)}"}
- end
- end
+ {:ok, binary} ->
+ reply_action(state, :read_any, {:ok, {stream_hint, binary}})
- defp normalize_cd(nil), do: {:ok, ''}
+ eagain() ->
+ noreply_action(state)
- defp normalize_cd(cd) do
- if File.exists?(cd) && File.dir?(cd) do
- {:ok, to_charlist(cd)}
- else
- {:error, "`:cd` must be valid directory path"}
+ {:error, errno} ->
+ reply_action(%Process{state | errno: errno}, :read_any, {:error, errno})
end
end
- defp normalize_env(nil), do: {:ok, []}
-
- defp normalize_env(env) do
- env =
- Enum.map(env, fn {key, value} ->
- {to_charlist(key), to_charlist(value)}
- end)
-
- {:ok, env}
+ defp do_close(state, stream) do
+ ret = Nif.nif_close(stream_fd(state, stream))
+ {:reply, ret, state}
end
- defp validate_opts_fields(opts) do
- {_, additional_opts} = Keyword.split(opts, [:cd, :env])
-
- if Enum.empty?(additional_opts) do
- :ok
- else
- {:error, "invalid opts: #{inspect(additional_opts)}"}
+ defp stream_fd(state, stream) do
+ case stream do
+ :stdin -> state.stdin
+ :stdout -> state.stdout
+ :stderr -> state.stderr
end
end
- defp normalize_args(cmd_with_args, opts) do
- with {:ok, cmd} <- normalize_cmd(cmd_with_args),
- {:ok, args} <- normalize_cmd_args(cmd_with_args),
- :ok <- validate_opts_fields(opts),
- {:ok, cd} <- normalize_cd(opts[:cd]),
- {:ok, env} <- normalize_env(opts[:env]) do
- {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env}}
+ defp can_read?(state, :stdout) do
+ cond do
+ state.read_stdout.client_pid ->
+ {:error, :pending_stdout_read}
+
+ true ->
+ :ok
end
end
- @spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
-
- defp exec(cmd_with_args, socket_path, env, cd) do
- opts = []
- opts = if cd, do: [{:cd, cd} | opts], else: []
- opts = if env, do: [{:env, env} | opts], else: opts
+ defp can_read?(state, :stderr) do
+ cond do
+ !state.use_stderr ->
+ {:error, :cannot_read_stderr}
- opts =
- [
- :nouse_stdio,
- :exit_status,
- :binary,
- args: [socket_path | cmd_with_args]
- ] ++ opts
+ state.read_stderr.client_pid ->
+ {:error, :pending_stderr_read}
- Port.open({:spawn_executable, @spawner_path}, opts)
+ true ->
+ :ok
+ end
end
- defp socket_path do
- str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
- path = Path.join(System.tmp_dir!(), str)
- _ = :file.delete(path)
- path
+ defp can_read?(state, :any) do
+ with :ok <- can_read?(state, :stdout) do
+ if state.use_stderr do
+ can_read?(state, :stderr)
+ else
+ :ok
+ end
+ end
end
defp signal(port, sig) when sig in [:sigkill, :sigterm] do
case Port.info(port, :os_pid) do
{:os_pid, os_pid} -> Nif.nif_kill(os_pid, sig)
:undefined -> {:error, :process_not_alive}
end
end
- defp start_process(%{args: %{cmd_with_args: cmd_with_args, cd: cd, env: env}} = state) do
- path = socket_path()
+ @spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
+
+ defp start_process(state) do
+ %{args: %{cmd_with_args: cmd_with_args, cd: cd, env: env}, use_stderr: use_stderr} = state
+
+ socket_path = socket_path()
{:ok, sock} = :socket.open(:local, :stream, :default)
try do
- :ok = socket_bind(sock, path)
+ :ok = socket_bind(sock, socket_path)
:ok = :socket.listen(sock)
- port = exec(cmd_with_args, path, env, cd)
+ spawner_cmdline_args = [socket_path, to_string(use_stderr) | cmd_with_args]
+
+ port_opts =
+ [:nouse_stdio, :exit_status, :binary, args: spawner_cmdline_args] ++
+ prune_nils(env: env, cd: cd)
+
+ port = Port.open({:spawn_executable, @spawner_path}, port_opts)
+
{:os_pid, os_pid} = Port.info(port, :os_pid)
- Exile.Watcher.watch(self(), os_pid, path)
+ Exile.Watcher.watch(self(), os_pid, socket_path)
- {stdout, stdin} = receive_fds(sock)
+ {stdin, stdout, stderr} = receive_fds(sock, state.use_stderr)
%Process{
state
| port: port,
status: :start,
- socket_path: path,
+ socket_path: socket_path,
stdin: stdin,
- stdout: stdout
+ stdout: stdout,
+ stderr: stderr
}
after
:socket.close(sock)
end
end
@socket_timeout 2000
- defp receive_fds(lsock) do
+
+ defp receive_fds(lsock, use_stderr) do
{:ok, sock} = :socket.accept(lsock, @socket_timeout)
try do
- case :socket.recvmsg(sock, @socket_timeout) do
- {:ok, msg} ->
- %{
- ctrl: [
- %{
- data: <<stdin_fd_int::native-32, stdout_fd_int::native-32, _::binary>>,
- level: :socket,
- type: :rights
- }
- ]
- } = msg
-
- with {:ok, stdout} <- Nif.nif_create_fd(stdout_fd_int),
- {:ok, stdin} <- Nif.nif_create_fd(stdin_fd_int) do
- {stdout, stdin}
- else
- error ->
- raise Error,
- message: "Failed to create fd resources\n error: #{inspect(error)}"
- end
+ {:ok, msg} = :socket.recvmsg(sock, @socket_timeout)
+ %{ctrl: [%{data: data, level: :socket, type: :rights}]} = msg
- {:error, reason} ->
- raise Error,
- message:
- "Failed to receive stdin and stdout file descriptors\n error: #{inspect(reason)}"
- end
+ <<stdin_fd::native-32, stdout_fd::native-32, stderr_fd::native-32, _::binary>> = data
+
+ {:ok, stdout} = Nif.nif_create_fd(stdout_fd)
+ {:ok, stdin} = Nif.nif_create_fd(stdin_fd)
+
+ {:ok, stderr} =
+ if use_stderr do
+ Nif.nif_create_fd(stderr_fd)
+ else
+ {:ok, nil}
+ end
+
+ {stdin, stdout, stderr}
after
:socket.close(sock)
end
end
defp socket_bind(sock, path) do
case :socket.bind(sock, %{family: :local, path: path}) do
:ok -> :ok
# for OTP version <= 24 compatibility
{:ok, _} -> :ok
other -> other
end
end
+
+ defp socket_path do
+ str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
+ path = Path.join(System.tmp_dir!(), str)
+ _ = :file.delete(path)
+ path
+ end
+
+ defp prune_nils(kv) do
+ Enum.reject(kv, fn {_, v} -> is_nil(v) end)
+ end
+
+ defp reply_action(state, action, return_value) do
+ pending = Map.fetch!(state, action)
+
+ :ok = GenServer.reply(pending.client_pid, return_value)
+ {:noreply, Map.put(state, action, %Pending{})}
+ end
+
+ defp noreply_action(state) do
+ {:noreply, state}
+ end
+
+ defp normalize_cmd(arg) do
+ case arg do
+ [cmd | _] when is_binary(cmd) ->
+ path = System.find_executable(cmd)
+
+ if path do
+ {:ok, to_charlist(path)}
+ else
+ {:error, "command not found: #{inspect(cmd)}"}
+ end
+
+ _ ->
+ {:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
+ end
+ end
+
+ defp normalize_cmd_args([_ | args]) do
+ if is_list(args) && Enum.all?(args, &is_binary/1) do
+ {:ok, Enum.map(args, &to_charlist/1)}
+ else
+ {:error, "command arguments must be list of strings. #{inspect(args)}"}
+ end
+ end
+
+ defp normalize_cd(cd) do
+ case cd do
+ nil ->
+ {:ok, ''}
+
+ cd when is_binary(cd) ->
+ if File.exists?(cd) && File.dir?(cd) do
+ {:ok, to_charlist(cd)}
+ else
+ {:error, "`:cd` must be valid directory path"}
+ end
+
+ _ ->
+ {:error, "`:cd` must be a binary string"}
+ end
+ end
+
+ defp normalize_env(env) do
+ case env do
+ nil ->
+ {:ok, []}
+
+ env when is_list(env) or is_map(env) ->
+ env =
+ Enum.map(env, fn {key, value} ->
+ {to_charlist(key), to_charlist(value)}
+ end)
+
+ {:ok, env}
+
+ _ ->
+ {:error, "`:env` must be a map or list of `{string, string}`"}
+ end
+ end
+
+ defp normalize_use_stderr(use_stderr) do
+ case use_stderr do
+ nil ->
+ {:ok, false}
+
+ use_stderr when is_boolean(use_stderr) ->
+ {:ok, use_stderr}
+
+ _ ->
+ {:error, ":use_stderr must be a boolean"}
+ end
+ end
+
+ defp validate_opts_fields(opts) do
+ {_, additional_opts} = Keyword.split(opts, [:cd, :env, :use_stderr])
+
+ if Enum.empty?(additional_opts) do
+ :ok
+ else
+ {:error, "invalid opts: #{inspect(additional_opts)}"}
+ end
+ end
+
+ defp normalize_args(cmd_with_args, opts) do
+ with {:ok, cmd} <- normalize_cmd(cmd_with_args),
+ {:ok, args} <- normalize_cmd_args(cmd_with_args),
+ :ok <- validate_opts_fields(opts),
+ {:ok, cd} <- normalize_cd(opts[:cd]),
+ {:ok, use_stderr} <- normalize_use_stderr(opts[:use_stderr]),
+ {:ok, env} <- normalize_env(opts[:env]) do
+ {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, use_stderr: use_stderr}}
+ end
+ end
end
diff --git a/lib/exile/process_nif.ex b/lib/exile/process_nif.ex
index 7607274..2da0f15 100644
--- a/lib/exile/process_nif.ex
+++ b/lib/exile/process_nif.ex
@@ -1,21 +1,21 @@
defmodule Exile.ProcessNif do
@moduledoc false
@on_load :load_nifs
def load_nifs do
nif_path = :filename.join(:code.priv_dir(:exile), "exile")
:erlang.load_nif(nif_path, 0)
end
def nif_is_os_pid_alive(_os_pid), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_kill(_os_pid, _signal), do: :erlang.nif_error(:nif_library_not_loaded)
- def nif_read(_fd, _request), do: :erlang.nif_error(:nif_library_not_loaded)
+ def nif_read(_fd, _max_size), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_create_fd(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_close(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_write(_fd, _bin), do: :erlang.nif_error(:nif_library_not_loaded)
end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index bf26c08..f5c0ade 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,170 +1,202 @@
defmodule Exile.Stream do
@moduledoc """
Defines a `Exile.Stream` struct returned by `Exile.stream!/3`.
"""
alias Exile.Process
defmodule Sink do
defstruct [:process]
defimpl Collectable do
def into(%{process: process} = stream) do
collector_fun = fn
:ok, {:cont, x} ->
:ok = Process.write(process, x)
:ok, :done ->
:ok = Process.close_stdin(process)
stream
:ok, :halt ->
:ok = Process.close_stdin(process)
end
{:ok, collector_fun}
end
end
end
defstruct [:process, :stream_opts]
@type t :: %__MODULE__{}
@doc false
def __build__(cmd_with_args, opts) do
- {stream_opts, process_opts} = Keyword.split(opts, [:exit_timeout, :chunk_size, :input])
+ {stream_opts, process_opts} =
+ Keyword.split(opts, [:exit_timeout, :max_chunk_size, :input, :use_stderr])
with {:ok, stream_opts} <- normalize_stream_opts(stream_opts) do
+ process_opts = Keyword.put(process_opts, :use_stderr, stream_opts[:use_stderr])
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
start_input_streamer(%Sink{process: process}, stream_opts.input)
%Exile.Stream{process: process, stream_opts: stream_opts}
else
{:error, error} -> raise ArgumentError, message: error
end
end
@doc false
defp start_input_streamer(sink, input) do
case input do
:no_input ->
:ok
{:enumerable, enum} ->
spawn_link(fn ->
Enum.into(enum, sink)
end)
{:collectable, func} ->
spawn_link(fn ->
func.(sink)
end)
end
end
defimpl Enumerable do
- def reduce(%{process: process, stream_opts: stream_opts}, acc, fun) do
- start_fun = fn -> :ok end
+ def reduce(arg, acc, fun) do
+ %{process: process, stream_opts: %{use_stderr: use_stderr} = stream_opts} = arg
- next_fun = fn :ok ->
- case Process.read(process, stream_opts.chunk_size) do
- {:eof, []} ->
+ start_fun = fn -> :normal end
+
+ next_fun = fn :normal ->
+ case Process.read_any(process, stream_opts.max_chunk_size) do
+ :eof ->
{:halt, :normal}
- {:eof, x} ->
- # multiple reads on closed pipe always returns :eof
- {[IO.iodata_to_binary(x)], :ok}
+ {:ok, {:stdout, x}} when use_stderr == false ->
+ {[IO.iodata_to_binary(x)], :normal}
- {:ok, x} ->
- {[IO.iodata_to_binary(x)], :ok}
+ {:ok, {stream, x}} when use_stderr == true ->
+ {[{stream, IO.iodata_to_binary(x)}], :normal}
{:error, errno} ->
raise "Failed to read from the process. errno: #{errno}"
end
end
after_fun = fn exit_type ->
try do
# always close stdin before stoping to give the command chance to exit properly
Process.close_stdin(process)
result = Process.await_exit(process, stream_opts.exit_timeout)
case {exit_type, result} do
{_, :timeout} ->
Process.kill(process, :sigkill)
raise "command fail to exit within timeout: #{stream_opts[:exit_timeout]}"
{:normal, {:ok, {:exit, 0}}} ->
:ok
{:normal, {:ok, error}} ->
raise "command exited with status: #{inspect(error)}"
{exit_type, error} ->
Process.kill(process, :sigkill)
raise "command exited with exit_type: #{exit_type}, error: #{inspect(error)}"
end
after
Process.stop(process)
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
end
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}
!is_function(term) && Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}
is_function(term, 1) ->
{:ok, {:collectable, term}}
true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end
- defp normalize_chunk_size(nil), do: {:ok, 65536}
- defp normalize_chunk_size(:no_buffering), do: {:ok, :no_buffering}
+ defp normalize_max_chunk_size(max_chunk_size) do
+ case max_chunk_size do
+ nil ->
+ {:ok, 65536}
+
+ max_chunk_size when is_integer(max_chunk_size) and max_chunk_size > 0 ->
+ {:ok, max_chunk_size}
+
+ _ ->
+ {:error, ":max_chunk_size must be a positive integer"}
+ end
+ end
+
+ defp normalize_exit_timeout(timeout) do
+ case timeout do
+ nil ->
+ {:ok, :infinity}
+
+ timeout when is_integer(timeout) and timeout > 0 ->
+ {:ok, timeout}
- defp normalize_chunk_size(chunk_size) do
- if is_integer(chunk_size) and chunk_size > 0,
- do: {:ok, chunk_size},
- else: {:error, ":exit_timeout must be either :infinity or a positive integer"}
+ _ ->
+ {:error, ":exit_timeout must be either :infinity or an integer"}
+ end
end
- defp normalize_exit_timeout(term) when term in [nil, :infinity], do: {:ok, :infinity}
+ defp normalize_use_stderr(use_stderr) do
+ case use_stderr do
+ nil ->
+ {:ok, false}
- defp normalize_exit_timeout(term) do
- if is_integer(term),
- do: {:ok, term},
- else: {:error, ":exit_timeout must be either :infinity or an integer"}
+ use_stderr when is_boolean(use_stderr) ->
+ {:ok, use_stderr}
+
+ _ ->
+ {:error, ":use_stderr must be a boolean"}
+ end
end
defp normalize_stream_opts(opts) when is_list(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]),
- {:ok, chunk_size} <- normalize_chunk_size(opts[:chunk_size]) do
- {:ok, %{input: input, exit_timeout: exit_timeout, chunk_size: chunk_size}}
+ {:ok, max_chunk_size} <- normalize_max_chunk_size(opts[:max_chunk_size]),
+ {:ok, use_stderr} <- normalize_use_stderr(opts[:use_stderr]) do
+ {:ok,
+ %{
+ input: input,
+ exit_timeout: exit_timeout,
+ max_chunk_size: max_chunk_size,
+ use_stderr: use_stderr
+ }}
end
end
defp normalize_stream_opts(_), do: {:error, "stream_opts must be a keyword list"}
end
diff --git a/mix.exs b/mix.exs
index 648b1f5..6d5ee59 100644
--- a/mix.exs
+++ b/mix.exs
@@ -1,56 +1,56 @@
defmodule Exile.MixProject do
use Mix.Project
def project do
[
app: :exile,
version: "0.1.0",
elixir: "~> 1.7",
start_permanent: Mix.env() == :prod,
compilers: [:elixir_make] ++ Mix.compilers(),
make_targets: ["all"],
make_clean: ["clean"],
deps: deps(),
# Package
package: package(),
description: description(),
# Docs
source_url: "https://github.com/akash-akya/exile",
homepage_url: "https://github.com/akash-akya/exile",
docs: [
main: "readme",
extras: ["README.md"]
]
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
mod: {Exile, []},
- extra_applications: [:logger]
+ extra_applications: [:logger, :crypto]
]
end
defp description do
"NIF based solution to interact with external programs with back-pressure"
end
defp package do
[
maintainers: ["Akash Hiremath"],
licenses: ["Apache-2.0"],
links: %{GitHub: "https://github.com/akash-akya/exile"}
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:elixir_make, "~> 0.6", runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev}
]
end
end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index ab923e1..315891d 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,366 +1,410 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
alias Exile.Process
test "read" do
{:ok, s} = Process.start_link(~w(echo test))
- assert {:eof, iodata} = Process.read(s, 100)
+ assert {:ok, iodata} = Process.read(s, 100)
+ assert :eof = Process.read(s, 100)
assert IO.iodata_to_binary(iodata) == "test\n"
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "write" do
{:ok, s} = Process.start_link(~w(cat))
assert :ok == Process.write(s, "hello")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "hello"
assert :ok == Process.write(s, "world")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "world"
assert :ok == Process.close_stdin(s)
- assert {:eof, []} == Process.read(s)
+ assert :eof == Process.read(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
end
test "stdin close" do
logger = start_events_collector()
# base64 produces output only after getting EOF from stdin. we
# collect events in order and assert that we can still read from
# stdout even after closing stdin
{:ok, s} = Process.start_link(~w(base64))
# parallel reader should be blocked till we close stdin
start_parallel_reader(s, logger)
:timer.sleep(100)
assert :ok == Process.write(s, "hello")
add_event(logger, {:write, "hello"})
assert :ok == Process.write(s, "world")
add_event(logger, {:write, "world"})
:timer.sleep(100)
assert :ok == Process.close_stdin(s)
add_event(logger, :input_close)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
assert [
{:write, "hello"},
{:write, "world"},
:input_close,
{:read, "aGVsbG93b3JsZA==\n"},
:eof
] == get_events(logger)
end
test "external command termination on stop" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
:timer.sleep(100)
refute os_process_alive?(os_pid)
end
test "external command kill on stop" do
{:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
if os_process_alive?(os_pid) do
:timer.sleep(1000)
refute os_process_alive?(os_pid)
else
:ok
end
end
test "exit status" do
{:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
assert {:ok, {:exit, 10}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "writing binary larger than pipe buffer size" do
large_bin = generate_binary(5 * 65535)
{:ok, s} = Process.start_link(~w(cat))
writer =
Task.async(fn ->
Process.write(s, large_bin)
Process.close_stdin(s)
end)
:timer.sleep(100)
- {_, iodata} = Process.read(s, 5 * 65535)
+ iodata =
+ Stream.unfold(nil, fn _ ->
+ case Process.read(s) do
+ {:ok, data} -> {data, nil}
+ :eof -> nil
+ end
+ end)
+ |> Enum.to_list()
+
Task.await(writer)
assert IO.iodata_length(iodata) == 5 * 65535
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
end
+ test "stderr_read" do
+ {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], use_stderr: true)
+ assert {:ok, "foo\n"} = Process.read_stderr(s, 100)
+ Process.stop(s)
+ end
+
+ test "stderr_read with stderr disabled" do
+ {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], use_stderr: false)
+ assert {:error, :cannot_read_stderr} = Process.read_stderr(s, 100)
+ Process.stop(s)
+ end
+
+ test "stderr_any" do
+ script = """
+ echo "foo"
+ echo "bar" >&2
+ """
+
+ {:ok, s} = Process.start_link(["sh", "-c", script], use_stderr: true)
+ {:ok, ret1} = Process.read_any(s, 100)
+ {:ok, ret2} = Process.read_any(s, 100)
+
+ assert {:stderr, "bar\n"} in [ret1, ret2]
+ assert {:stdout, "foo\n"} in [ret1, ret2]
+
+ assert :eof = Process.read_any(s, 100)
+ Process.stop(s)
+ end
+
+ test "stderr_any with stderr disabled" do
+ script = """
+ echo "foo"
+ echo "bar" >&2
+ """
+
+ {:ok, s} = Process.start_link(["sh", "-c", script], use_stderr: false)
+ {:ok, ret1} = Process.read_any(s, 100)
+
+ assert ret1 == {:stdout, "foo\n"}
+
+ assert :eof = Process.read_any(s, 100)
+ Process.stop(s)
+ end
+
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link(~w(cat))
large_bin = generate_binary(65535 * 5)
writer =
Task.async(fn ->
- Enum.each(1..10, fn i ->
- Process.write(s, large_bin)
- add_event(logger, {:write, i})
- end)
-
+ :ok = Process.write(s, large_bin)
+ add_event(logger, {:write, IO.iodata_length(large_bin)})
Process.close_stdin(s)
end)
:timer.sleep(50)
reader =
Task.async(fn ->
- Enum.each(1..10, fn i ->
- Process.read(s, 5 * 65535)
- add_event(logger, {:read, i})
- # delay in reading should delay writes
- :timer.sleep(10)
+ Stream.unfold(nil, fn _ ->
+ case Process.read(s) do
+ {:ok, data} ->
+ add_event(logger, {:read, IO.iodata_length(data)})
+ # delay in reading should delay writes
+ :timer.sleep(10)
+ {nil, nil}
+
+ :eof ->
+ nil
+ end
end)
+ |> Stream.run()
end)
Task.await(writer)
Task.await(reader)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
- assert [
- write: 1,
- read: 1,
- write: 2,
- read: 2,
- write: 3,
- read: 3,
- write: 4,
- read: 4,
- write: 5,
- read: 5,
- write: 6,
- read: 6,
- write: 7,
- read: 7,
- write: 8,
- read: 8,
- write: 9,
- read: 9,
- write: 10,
- read: 10
- ] == get_events(logger)
+ events = get_events(logger)
+
+ {write_events, read_evants} = Enum.split_with(events, &match?({:write, _}, &1))
+
+ assert Enum.sum(Enum.map(read_evants, fn {:read, size} -> size end)) ==
+ Enum.sum(Enum.map(write_events, fn {:write, size} -> size end))
+
+ # There must be a read before write completes
+ assert hd(events) == {:read, 65535}
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
Process.stop(s)
open_files = parse_lsof(bin)
assert [%{fd: "0", name: _, type: "PIPE"}, %{type: "PIPE", fd: "1", name: _}] = open_files
end
test "process kill with pending write" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
large_data =
Stream.cycle(["test"]) |> Stream.take(500_000) |> Enum.to_list() |> IO.iodata_to_binary()
task =
Task.async(fn ->
try do
Process.write(s, large_data)
catch
:exit, reason -> reason
end
end)
:timer.sleep(200)
Process.stop(s)
:timer.sleep(3000)
refute os_process_alive?(os_pid)
assert {:normal, _} = Task.await(task)
end
test "concurrent read" do
{:ok, s} = Process.start_link(~w(cat))
task = Task.async(fn -> Process.read(s, 1) end)
# delaying concurrent read to avoid race-condition
Elixir.Process.sleep(100)
- assert {:error, :pending_read} = Process.read(s, 1)
+ assert {:error, :pending_stdout_read} = Process.read(s, 1)
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
_ = Task.await(task)
end
test "cd" do
parent = Path.expand("..", File.cwd!())
{:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
{:ok, dir} = Process.read(s)
assert String.trim(dir) == parent
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "invalid path" do
assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
end
test "invalid opt" do
assert {:error, "invalid opts: [invalid: :test]"} =
Process.start_link(~w(cat), invalid: :test)
end
test "env" do
assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
assert {:ok, "test\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "if external process inherits beam env" do
:ok = System.put_env([{"BEAM_ENV_A", "10"}])
assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
assert {:ok, "10\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "if user env overrides beam env" do
:ok = System.put_env([{"BEAM_ENV", "base"}])
assert {:ok, s} =
Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
assert {:ok, "overridden\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "await_exit when process is stopped" do
assert {:ok, s} = Process.start_link(~w(cat))
tasks =
Enum.map(1..10, fn _ ->
Task.async(fn -> Process.await_exit(s) end)
end)
assert :ok == Process.close_stdin(s)
Elixir.Process.sleep(100)
Enum.each(tasks, fn task ->
assert {:ok, {:exit, 0}} = Task.await(task)
end)
Process.stop(s)
end
def start_parallel_reader(proc_server, logger) do
spawn_link(fn -> reader_loop(proc_server, logger) end)
end
def reader_loop(proc_server, logger) do
case Process.read(proc_server) do
{:ok, data} ->
add_event(logger, {:read, data})
reader_loop(proc_server, logger)
- {:eof, []} ->
+ :eof ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
Stream.repeatedly(fn -> "A" end) |> Enum.take(size) |> IO.iodata_to_binary()
end
end
diff --git a/test/exile_test.exs b/test/exile_test.exs
index 3fdae15..8f03e51 100644
--- a/test/exile_test.exs
+++ b/test/exile_test.exs
@@ -1,23 +1,62 @@
defmodule ExileTest do
use ExUnit.Case
test "stream with enumerable" do
- proc_stream = Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end))
- output = proc_stream |> Enum.to_list()
- assert IO.iodata_length(output) == 1000
+ proc_stream =
+ Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end), use_stderr: false)
+
+ stdout = Enum.to_list(proc_stream)
+ assert IO.iodata_length(stdout) == 1000
end
test "stream with collectable" do
proc_stream =
Exile.stream!(["cat"], input: fn sink -> Enum.into(1..1000, sink, fn _ -> "a" end) end)
- output = proc_stream |> Enum.to_list()
- assert IO.iodata_length(output) == 1000
+ stdout = Enum.to_list(proc_stream)
+ assert IO.iodata_length(stdout) == 1000
end
test "stream without stdin" do
proc_stream = Exile.stream!(~w(echo hello))
- output = proc_stream |> Enum.to_list()
- assert IO.iodata_to_binary(output) == "hello\n"
+ stdout = Enum.to_list(proc_stream)
+ assert IO.iodata_to_binary(stdout) == "hello\n"
+ end
+
+ test "stderr" do
+ proc_stream = Exile.stream!(["sh", "-c", "echo foo >>/dev/stderr"], use_stderr: true)
+
+ assert {[], stderr} = split_stream(proc_stream)
+ assert IO.iodata_to_binary(stderr) == "foo\n"
+ end
+
+ test "multiple streams" do
+ script = """
+ for i in {1..1000}; do
+ echo "foo ${i}"
+ echo "bar ${i}" >&2
+ done
+ """
+
+ proc_stream = Exile.stream!(["sh", "-c", script], use_stderr: true)
+
+ {stdout, stderr} = split_stream(proc_stream)
+
+ stdout_lines = String.split(Enum.join(stdout), "\n", trim: true)
+ stderr_lines = String.split(Enum.join(stderr), "\n", trim: true)
+
+ assert length(stdout_lines) == length(stderr_lines)
+ assert Enum.all?(stdout_lines, &String.starts_with?(&1, "foo "))
+ assert Enum.all?(stderr_lines, &String.starts_with?(&1, "bar "))
+ end
+
+ defp split_stream(stream) do
+ {stdout, stderr} =
+ Enum.reduce(stream, {[], []}, fn
+ {:stdout, data}, {stdout, stderr} -> {[data | stdout], stderr}
+ {:stderr, data}, {stdout, stderr} -> {stdout, [data | stderr]}
+ end)
+
+ {Enum.reverse(stdout), Enum.reverse(stderr)}
end
end

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 2:37 AM (1 d, 21 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40802
Default Alt Text
(79 KB)

Event Timeline