Page MenuHomePhorge

No OneTemporary

Size
45 KB
Referenced Files
None
Subscribers
None
diff --git a/c_src/exile.c b/c_src/exile.c
index 48e0e88..a3c65bf 100644
--- a/c_src/exile.c
+++ b/c_src/exile.c
@@ -1,487 +1,392 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "erl_nif.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#ifdef ERTS_DIRTY_SCHEDULERS
#define USE_DIRTY_IO ERL_NIF_DIRTY_JOB_IO_BOUND
#else
#define USE_DIRTY_IO 0
#endif
//#define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
#define start_timing() ErlNifTime __start = enif_monotonic_time(ERL_NIF_USEC)
#define elapsed_microseconds() (enif_monotonic_time(ERL_NIF_USEC) - __start)
#else
#define debug(...)
#define start_timing()
#define elapsed_microseconds() 0
#endif
#define error(...) \
do { \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
-#define GET_CTX(env, arg, ctx) \
- do { \
- ExilePriv *data = enif_priv_data(env); \
- if (enif_get_resource(env, arg, data->exec_ctx_rt, (void **)&ctx) == \
- false) { \
- return make_error(env, ATOM_INVALID_CTX); \
- } \
- } while (0);
-
-static const int CMD_EXIT = -1;
static const int UNBUFFERED_READ = -1;
static const int PIPE_BUF_SIZE = 65535;
+static const int FD_CLOSED = -1;
+
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_UNDEFINED;
-static ERL_NIF_TERM ATOM_INVALID_CTX;
+static ERL_NIF_TERM ATOM_INVALID_FD;
static ERL_NIF_TERM ATOM_PIPE_CLOSED;
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ALLOC_FAILED;
+static ERL_NIF_TERM ATOM_SIGKILL;
+static ERL_NIF_TERM ATOM_SIGTERM;
+
/* command exit types */
static ERL_NIF_TERM ATOM_EXIT;
static ERL_NIF_TERM ATOM_SIGNALED;
static ERL_NIF_TERM ATOM_STOPPED;
-enum exec_status {
- SUCCESS,
- PIPE_CREATE_ERROR,
- PIPE_FLAG_ERROR,
- FORK_ERROR,
- PIPE_DUP_ERROR,
- NULL_DEV_OPEN_ERROR,
-};
-
enum exit_type { NORMAL_EXIT, SIGNALED, STOPPED };
-typedef struct ExilePriv {
- ErlNifResourceType *exec_ctx_rt;
- ErlNifResourceType *io_rt;
-} ExilePriv;
-
-typedef struct ExecContext {
- int cmd_input_fd;
- int cmd_output_fd;
- int exit_status; // can be exit status or signal number depending on exit_type
- enum exit_type exit_type;
- pid_t pid;
- // these are to hold enif_select resource objects
- int *read_resource;
- int *write_resource;
-} ExecContext;
-
-typedef struct StartProcessResult {
- bool success;
- int err;
- ExecContext context;
-} StartProcessResult;
-
-/* TODO: assert if the external process is exit (?) */
-static void exec_ctx_dtor(ErlNifEnv *env, void *obj) {
- ExecContext *ctx = obj;
- enif_release_resource(ctx->read_resource);
- enif_release_resource(ctx->write_resource);
- debug("Exile exec_ctx_dtor called");
-}
-
-static void exec_ctx_stop(ErlNifEnv *env, void *obj, int fd,
- int is_direct_call) {
- debug("Exile exec_ctx_stop called");
-}
-
-static void exec_ctx_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
- ErlNifMonitor *monitor) {
- debug("Exile exec_ctx_down called");
+static void close_fd(int *fd) {
+ if (*fd != FD_CLOSED) {
+ close(*fd);
+ *fd = FD_CLOSED;
+ }
}
-static ErlNifResourceTypeInit exec_ctx_rt_init = {exec_ctx_dtor, exec_ctx_stop,
- exec_ctx_down};
-
static void io_resource_dtor(ErlNifEnv *env, void *obj) {
+ int *fd = (int *)obj;
+ close_fd(fd);
debug("Exile io_resource_dtor called");
}
static void io_resource_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
debug("Exile io_resource_stop called %d", fd);
}
static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
+ int *fd = (int *)obj;
+ close_fd(fd);
debug("Exile io_resource_down called");
}
static ErlNifResourceTypeInit io_rt_init = {io_resource_dtor, io_resource_stop,
io_resource_down};
static ErlNifResourceType *FD_RT;
static inline ERL_NIF_TERM make_ok(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_OK, term);
}
static inline ERL_NIF_TERM make_error(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_ERROR, term);
}
/* time is assumed to be in microseconds */
static void notify_consumed_timeslice(ErlNifEnv *env, ErlNifTime start,
ErlNifTime stop) {
ErlNifTime pct;
pct = (ErlNifTime)((stop - start) / 10);
if (pct > 100)
pct = 100;
else if (pct == 0)
pct = 1;
enif_consume_timeslice(env, pct);
}
-static int select_write_async(ErlNifEnv *env, int *fd) {
+static int select_write(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_WRITE, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_write()");
return ret;
}
static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 2)
- enif_make_badarg(env);
+ return enif_make_badarg(env);
ErlNifTime start;
ssize_t size;
ErlNifBinary bin;
int write_errno;
int *fd;
start = enif_monotonic_time(ERL_NIF_USEC);
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
- return make_error(env, ATOM_INVALID_CTX);
+ return make_error(env, ATOM_INVALID_FD);
if (enif_inspect_binary(env, argv[1], &bin) != true)
return enif_make_badarg(env);
if (bin.size == 0)
return enif_make_badarg(env);
/* should we limit the bin.size here? */
size = write(*fd, bin.data, bin.size);
write_errno = errno;
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (size >= (ssize_t)bin.size) { // request completely satisfied
return make_ok(env, enif_make_int(env, size));
} else if (size >= 0) { // request partially satisfied
- int retval = select_write_async(env, fd);
+ int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, enif_make_int(env, size));
} else if (write_errno == EAGAIN || write_errno == EWOULDBLOCK) { // busy
- int retval = select_write_async(env, fd);
+ int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else { // Error
perror("write()");
return make_error(env, enif_make_int(env, write_errno));
}
}
-static int select_read_async(ErlNifEnv *env, int *fd) {
+static int select_read(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_READ, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_read()");
return ret;
}
static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 1)
- enif_make_badarg(env);
+ return enif_make_badarg(env);
ERL_NIF_TERM term;
int *fd;
fd = enif_alloc_resource(FD_RT, sizeof(int));
if (!enif_get_int(env, argv[0], fd))
goto error_exit;
term = enif_make_resource(env, fd);
enif_release_resource(fd);
return make_ok(env, term);
error_exit:
enif_release_resource(fd);
return ATOM_ERROR;
}
-static ERL_NIF_TERM nif_read_async(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
+static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc,
+ const ERL_NIF_TERM argv[]) {
if (argc != 2)
- enif_make_badarg(env);
+ return enif_make_badarg(env);
ErlNifTime start;
int size, demand;
int *fd;
start = enif_monotonic_time(ERL_NIF_USEC);
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
- return make_error(env, ATOM_INVALID_CTX);
+ return make_error(env, ATOM_INVALID_FD);
if (!enif_get_int(env, argv[1], &demand))
return enif_make_badarg(env);
size = demand;
if (demand == UNBUFFERED_READ) {
size = PIPE_BUF_SIZE;
} else if (demand < 1) {
- enif_make_badarg(env);
+ return enif_make_badarg(env);
} else if (demand > PIPE_BUF_SIZE) {
size = PIPE_BUF_SIZE;
}
unsigned char buf[size];
ssize_t result = read(*fd, buf, size);
int read_errno = errno;
ERL_NIF_TERM bin_term = 0;
if (result >= 0) {
/* no need to release this binary */
unsigned char *temp = enif_make_new_binary(env, result, &bin_term);
memcpy(temp, buf, result);
}
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (result >= 0) {
/* we do not 'select' if demand completely satisfied OR EOF OR its
* UNBUFFERED_READ */
if (result == demand || result == 0 || demand == UNBUFFERED_READ) {
return make_ok(env, bin_term);
} else { // demand partially satisfied
- int retval = select_read_async(env, fd);
+ int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, bin_term);
}
} else {
if (read_errno == EAGAIN || read_errno == EWOULDBLOCK) { // busy
- int retval = select_read_async(env, fd);
+ int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else { // Error
perror("read()");
return make_error(env, enif_make_int(env, read_errno));
}
}
}
static ERL_NIF_TERM nif_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 1)
- enif_make_badarg(env);
+ return enif_make_badarg(env);
- ErlNifTime start;
int *fd;
- start = enif_monotonic_time(ERL_NIF_USEC);
-
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
- return make_error(env, ATOM_INVALID_CTX);
+ return make_error(env, ATOM_INVALID_FD);
- close(*fd);
+ close_fd(fd);
- notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
return ATOM_OK;
}
-static ERL_NIF_TERM is_alive(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
-
- if (ctx->pid == CMD_EXIT)
- return make_ok(env, ATOM_TRUE);
+static ERL_NIF_TERM nif_is_os_pid_alive(ErlNifEnv *env, int argc,
+ const ERL_NIF_TERM argv[]) {
+ pid_t pid;
- int result = kill(ctx->pid, 0);
+ if (argc != 1)
+ return enif_make_badarg(env);
- if (result == 0) {
- return make_ok(env, ATOM_TRUE);
- } else {
- return make_ok(env, ATOM_FALSE);
- }
-}
+ // we should not assume pid type to be `int`?
+ if (!enif_get_int(env, argv[0], (int *)&pid))
+ return enif_make_badarg(env);
-static ERL_NIF_TERM sys_terminate(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
- if (ctx->pid == CMD_EXIT)
- return make_ok(env, enif_make_int(env, 0));
+ int result = kill(pid, 0);
- return make_ok(env, enif_make_int(env, kill(ctx->pid, SIGTERM)));
+ if (result == 0)
+ return ATOM_TRUE;
+ else
+ return ATOM_FALSE;
}
-static ERL_NIF_TERM sys_kill(ErlNifEnv *env, int argc,
+static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
- if (ctx->pid == CMD_EXIT)
- return make_ok(env, enif_make_int(env, 0));
+ pid_t pid;
+ int ret;
- return make_ok(env, enif_make_int(env, kill(ctx->pid, SIGKILL)));
-}
+ if (argc != 2)
+ return enif_make_badarg(env);
-static ERL_NIF_TERM make_exit_term(ErlNifEnv *env, ExecContext *ctx) {
- switch (ctx->exit_type) {
- case NORMAL_EXIT:
- return make_ok(env, enif_make_tuple2(env, ATOM_EXIT,
- enif_make_int(env, ctx->exit_status)));
- case SIGNALED:
- /* exit_status here points to signal number */
- return make_ok(env, enif_make_tuple2(env, ATOM_SIGNALED,
- enif_make_int(env, ctx->exit_status)));
- case STOPPED:
- return make_ok(env, enif_make_tuple2(env, ATOM_STOPPED,
- enif_make_int(env, ctx->exit_status)));
- default:
- error("Invalid wait status");
- return make_error(env, ATOM_UNDEFINED);
- }
-}
+ // we should not assume pid type to be `int`?
+ if (!enif_get_int(env, argv[0], (int *)&pid))
+ return enif_make_badarg(env);
-static ERL_NIF_TERM sys_wait(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
-
- if (ctx->pid == CMD_EXIT)
- return make_exit_term(env, ctx);
-
- int status;
- int wpid = waitpid(ctx->pid, &status, WNOHANG);
-
- if (wpid == ctx->pid) {
- ctx->pid = CMD_EXIT;
-
- if (WIFEXITED(status)) {
- ctx->exit_type = NORMAL_EXIT;
- ctx->exit_status = WEXITSTATUS(status);
- } else if (WIFSIGNALED(status)) {
- ctx->exit_type = SIGNALED;
- ctx->exit_status = WTERMSIG(status);
- } else if (WIFSTOPPED(status)) {
- ctx->exit_type = STOPPED;
- ctx->exit_status = 0;
- }
+ if (enif_compare(argv[1], ATOM_SIGKILL) == 0)
+ ret = kill(pid, SIGKILL);
+ else if (enif_compare(argv[1], ATOM_SIGTERM) == 0)
+ ret = kill(pid, SIGTERM);
+ else
+ return enif_make_badarg(env);
- return make_exit_term(env, ctx);
- } else if (wpid != 0) {
- perror("waitpid()");
+ if (ret != 0) {
+ perror("[exile] failed to send signal");
+ return make_error(
+ env, enif_make_string(env, "failed to send signal", ERL_NIF_LATIN1));
}
- ERL_NIF_TERM term = enif_make_tuple2(env, enif_make_int(env, wpid),
- enif_make_int(env, status));
- return make_error(env, term);
-}
-
-static ERL_NIF_TERM os_pid(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
- if (ctx->pid == CMD_EXIT)
- return make_ok(env, enif_make_int(env, 0));
- return make_ok(env, enif_make_int(env, ctx->pid));
+ return ATOM_OK;
}
-static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
- struct ExilePriv *data = enif_alloc(sizeof(struct ExilePriv));
- if (!data)
- return 1;
-
- data->exec_ctx_rt =
- enif_open_resource_type_x(env, "exile_resource", &exec_ctx_rt_init,
- ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
- data->io_rt =
- enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
- ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
+/* static ERL_NIF_TERM make_exit_term(ErlNifEnv *env, ExecContext *ctx) { */
+/* switch (ctx->exit_type) { */
+/* case NORMAL_EXIT: */
+/* return make_ok(env, enif_make_tuple2(env, ATOM_EXIT, */
+/* enif_make_int(env,
+ * ctx->exit_status))); */
+/* case SIGNALED: */
+/* /\* exit_status here points to signal number *\/ */
+/* return make_ok(env, enif_make_tuple2(env, ATOM_SIGNALED, */
+/* enif_make_int(env,
+ * ctx->exit_status))); */
+/* case STOPPED: */
+/* return make_ok(env, enif_make_tuple2(env, ATOM_STOPPED, */
+/* enif_make_int(env,
+ * ctx->exit_status))); */
+/* default: */
+/* error("Invalid wait status"); */
+/* return make_error(env, ATOM_UNDEFINED); */
+/* } */
+/* } */
+static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
FD_RT =
enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_FALSE = enif_make_atom(env, "false");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
- ATOM_INVALID_CTX = enif_make_atom(env, "invalid_exile_exec_ctx");
+ ATOM_INVALID_FD = enif_make_atom(env, "invalid_fd_resource");
ATOM_PIPE_CLOSED = enif_make_atom(env, "closed_pipe");
ATOM_EXIT = enif_make_atom(env, "exit");
ATOM_SIGNALED = enif_make_atom(env, "signaled");
ATOM_STOPPED = enif_make_atom(env, "stopped");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ALLOC_FAILED = enif_make_atom(env, "alloc_failed");
- *priv = (void *)data;
+ ATOM_SIGTERM = enif_make_atom(env, "sigterm");
+ ATOM_SIGKILL = enif_make_atom(env, "sigkill");
return 0;
}
static void on_unload(ErlNifEnv *env, void *priv) {
debug("exile unload");
enif_free(priv);
}
static ErlNifFunc nif_funcs[] = {
- {"sys_terminate", 1, sys_terminate, USE_DIRTY_IO},
- {"sys_wait", 1, sys_wait, USE_DIRTY_IO},
- {"sys_kill", 1, sys_kill, USE_DIRTY_IO},
- {"nif_read_async", 2, nif_read_async, USE_DIRTY_IO},
+ {"nif_read", 2, nif_read, USE_DIRTY_IO},
{"nif_create_fd", 1, nif_create_fd, USE_DIRTY_IO},
{"nif_write", 2, nif_write, USE_DIRTY_IO},
{"nif_close", 1, nif_close, USE_DIRTY_IO},
- {"alive?", 1, is_alive, USE_DIRTY_IO},
- {"os_pid", 1, os_pid, USE_DIRTY_IO},
-};
+ {"nif_is_os_pid_alive", 1, nif_is_os_pid_alive, USE_DIRTY_IO},
+ {"nif_kill", 2, nif_kill, USE_DIRTY_IO}};
ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
diff --git a/c_src/spawner.c b/c_src/spawner.c
index 7786e04..58df12b 100644
--- a/c_src/spawner.c
+++ b/c_src/spawner.c
@@ -1,186 +1,188 @@
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <unistd.h>
static const int PIPE_READ = 0;
static const int PIPE_WRITE = 1;
/* We are choosing an exit code which is not reserved see:
* https://www.tldp.org/LDP/abs/html/exitcodes.html. */
static const int FORK_EXEC_FAILURE = 125;
static int set_flag(int fd, int flags) {
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
}
static int send_io_fds(int socket, int read_fd, int write_fd) {
struct msghdr msg = {0};
struct cmsghdr *cmsg;
int fds[2];
char buf[CMSG_SPACE(2 * sizeof(int))], dup[256];
memset(buf, '\0', sizeof(buf));
struct iovec io = {.iov_base = &dup, .iov_len = sizeof(dup)};
msg.msg_iov = &io;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(2 * sizeof(int));
fds[0] = read_fd;
fds[1] = write_fd;
memcpy((int *)CMSG_DATA(cmsg), fds, 2 * sizeof(int));
if (sendmsg(socket, &msg, 0) < 0) {
printf("Failed to send message");
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
/* This is not ideal, but as of now there is no portable way to do this */
+
+/*
static void close_all_fds() {
int fd_limit = (int)sysconf(_SC_OPEN_MAX);
for (int i = STDERR_FILENO + 1; i < fd_limit; i++)
close(i);
}
+*/
static void close_all(int pipes[2][2]) {
for (int i = 0; i < 2; i++) {
if (pipes[i][PIPE_READ] > 0)
close(pipes[i][PIPE_READ]);
if (pipes[i][PIPE_WRITE] > 0)
close(pipes[i][PIPE_WRITE]);
}
}
static int exec_process(char const *bin, char *const *args, int socket) {
- pid_t pid;
int pipes[2][2] = {{0, 0}, {0, 0}};
if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1) {
perror("[spawner] failed to create pipes");
close_all(pipes);
return 1;
}
printf("[spawner] pipe: done\r\n");
const int r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
const int w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
const int r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
const int w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0) {
perror("[spawner] failed to set flags for pipes");
close_all(pipes);
return 1;
}
printf("[spawner] set_flag done\r\n");
if (send_io_fds(socket, w_cmdin, r_cmdout) != EXIT_SUCCESS) {
perror("[spawner] failed to send fd via socket");
close_all(pipes);
return 1;
}
printf("[spawner] send_io_fds: done\r\n");
// TODO: close all fds including socket
// close_all_fds();
printf("[spawner] w_cmdin: %d r_cmdout: %d\r\n", w_cmdin, w_cmdout);
printf("[spawner] argv: %s\r\n", args[1]);
close(w_cmdin);
close(r_cmdout);
close(STDIN_FILENO);
if (dup2(r_cmdin, STDIN_FILENO) < 0) {
perror("[spawner] failed to dup to stdin");
_exit(FORK_EXEC_FAILURE);
}
close(STDOUT_FILENO);
if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
perror("[spawner] failed to dup to stdout");
_exit(FORK_EXEC_FAILURE);
}
execvp(bin, args);
perror("[spawner] execvp(): failed");
_exit(FORK_EXEC_FAILURE);
}
static int handle(const char *sock_path, const char *bin, char *const *args) {
int sfd;
struct sockaddr_un addr;
printf("[spawner] hadle\r\n");
sfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sfd == -1) {
printf("Failed to create socket");
return EXIT_FAILURE;
}
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, sock_path, sizeof(addr.sun_path) - 1);
printf("[spawner] connect\r\n");
if (connect(sfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) ==
-1) {
printf("Failed to connect to socket");
return EXIT_FAILURE;
}
printf("[spawner] exec_process\r\n");
if (exec_process(bin, args, sfd) != 0)
return -1;
// we never reach here
return 0;
}
int main(int argc, const char *argv[]) {
int status = EXIT_SUCCESS;
const char **proc_argv;
printf("[spawner] argc: %d\r\n", argc);
if (argc < 3) {
printf("[spawner] exit\r\n");
status = EXIT_FAILURE;
} else {
proc_argv = malloc((argc - 2 + 1) * sizeof(char *));
int i;
for (i = 2; i < argc; i++)
proc_argv[i - 2] = argv[i];
proc_argv[i - 2] = NULL;
printf("[spawner] exec: %s\r\n", argv[2]);
status = handle(argv[1], argv[2], (char *const *)proc_argv);
}
exit(status);
}
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index 2a15ec5..f82f2d5 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,458 +1,480 @@
defmodule Exile.Process do
@moduledoc """
GenServer which wraps spawned external command.
One should use `Exile.stream!` over `Exile.Process`. stream internally manages this server for you. Use this only if you need more control over the life-cycle OS process.
## Overview
`Exile.Process` is an alternative primitive for Port. It has different interface and approach to running external programs to solve the issues associated with the ports.
### When compared to Port
* it is demand driven. User explicitly has to `read` output of the command and the progress of the external command is controlled using OS pipes. so unlike Port, this never cause memory issues in beam by loading more than we can consume
* it can close stdin of the program explicitly
* does not create zombie process. It always tries to cleanup resources
At high level it makes non-blocking asynchronous system calls to execute and interact with the external program. It completely bypasses beam implementation for the same using NIF. It uses `select()` system call for asynchronous IO. Most of the system calls are non-blocking, so it does not has adverse effect on scheduler. Issues such as "scheduler collapse".
"""
alias Exile.ProcessNif, as: Nif
require Logger
use GenServer
# delay between exit_check when io is busy (in milliseconds)
@exit_check_timeout 5
@default_opts [env: []]
@doc """
Starts `Exile.ProcessServer`
Starts external program using `cmd_with_args` with options `opts`
`cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
* `cd` - the directory to run the command in
* `env` - an enumerable of tuples containing environment key-value. These can be accessed in the external program
"""
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
with {:ok, args} <- normalize_args(cmd_with_args, opts) do
GenServer.start(__MODULE__, args)
end
end
def close_stdin(process) do
GenServer.call(process, :close_stdin, :infinity)
end
def write(process, iodata) do
GenServer.call(process, {:write, IO.iodata_to_binary(iodata)}, :infinity)
end
def read(process, size) when (is_integer(size) and size > 0) or size == :unbuffered do
GenServer.call(process, {:read, size}, :infinity)
end
def read(process) do
GenServer.call(process, {:read, :unbuffered}, :infinity)
end
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process, {:kill, signal}, :infinity)
end
def await_exit(process, timeout \\ :infinity) do
GenServer.call(process, {:await_exit, timeout}, :infinity)
end
def os_pid(process) do
GenServer.call(process, :os_pid, :infinity)
end
def stop(process), do: GenServer.call(process, :stop, :infinity)
## Server
defmodule Pending do
defstruct bin: [], remaining: 0, client_pid: nil
end
defstruct [
:args,
:errno,
:port,
:socket_path,
:stdin,
:stdout,
:context,
:status,
await: %{},
pending_read: nil,
pending_write: nil
]
alias __MODULE__
def init(args) do
state = %__MODULE__{
args: args,
errno: nil,
status: :init,
await: %{},
pending_read: %Pending{},
pending_write: %Pending{}
}
{:ok, state, {:continue, nil}}
end
def handle_continue(nil, state) do
%{cmd_with_args: cmd_with_args, cd: cd, env: env} = state.args
socket_path = socket_path()
- Exile.Watcher.watch(self(), socket_path)
{:ok, uds} = :socket.open(:local, :stream, :default)
_ = :file.delete(socket_path)
{:ok, _} = :socket.bind(uds, %{family: :local, path: socket_path})
:ok = :socket.listen(uds)
port = exec(cmd_with_args, socket_path, env, cd)
+ {:os_pid, os_pid} = Port.info(port, :os_pid)
+ Exile.Watcher.watch(self(), os_pid, socket_path)
{write_fd_int, read_fd_int} = receive_fds(uds)
{:ok, write_fd} = Nif.nif_create_fd(write_fd_int)
{:ok, read_fd} = Nif.nif_create_fd(read_fd_int)
{:noreply,
%Process{
state
| port: port,
status: :start,
socket_path: socket_path,
stdin: read_fd,
stdout: write_fd
}}
end
def handle_call(:stop, _from, state) do
# watcher will take care of termination of external process
# TODO: pending write and read should receive "stopped" return
# value instead of exit signal
+ Port.close(state.port)
+
{:stop, :normal, :ok, state}
end
def handle_call(_, _from, %{status: {:exit, status}}), do: {:reply, {:error, {:exit, status}}}
def handle_call({:await_exit, timeout}, from, state) do
tref =
if timeout != :infinity do
Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
else
nil
end
state = put_timer(state, from, :timeout, tref)
check_exit(state, from)
end
def handle_call({:write, binary}, from, state) when is_binary(binary) do
pending = %Pending{bin: binary, client_pid: from}
do_write(%Process{state | pending_write: pending})
end
def handle_call({:read, size}, from, state) do
pending = %Pending{remaining: size, client_pid: from}
do_read(%Process{state | pending_read: pending})
end
def handle_call(:close_stdin, _from, state), do: do_close(state, :stdin)
- def handle_call(:os_pid, _from, state), do: {:reply, Nif.os_pid(state.context), state}
+ def handle_call(:os_pid, _from, state) do
+ case Port.info(state.port, :os_pid) do
+ {:os_pid, os_pid} ->
+ {:reply, {:ok, os_pid}, state}
+
+ :undefined ->
+ Logger.debug("Process not alive")
+ {:reply, :undefined, state}
+ end
+ end
+
+ def handle_call({:kill, signal}, _from, state) do
+ {:reply, signal(state.port, signal), state}
+ end
def handle_info({:check_exit, from}, state), do: check_exit(state, from)
def handle_info({:await_exit_timeout, from}, state) do
cancel_timer(state, from, :check)
receive do
{:check_exit, ^from} -> :ok
after
0 -> :ok
end
GenServer.reply(from, :timeout)
{:noreply, clear_await(state, from)}
end
def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
def handle_info({:select, _read_resource, _ref, :ready_input}, state), do: do_read(state)
def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state),
do: handle_port_exit(exit_status, state)
def handle_info(msg, _state), do: raise(msg)
defp handle_port_exit(exit_status, state) do
{:noreply, %{state | status: {:exit, exit_status}}}
end
defp do_write(%Process{pending_write: %Pending{bin: <<>>}} = state) do
GenServer.reply(state.pending_write.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
defp do_write(%Process{pending_write: pending} = state) do
case Nif.nif_write(state.stdin, pending.bin) do
{:ok, size} ->
if size < byte_size(pending.bin) do
binary = binary_part(pending.bin, size, byte_size(pending.bin) - size)
{:noreply, %{state | pending_write: %Pending{pending | bin: binary}}}
else
GenServer.reply(pending.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: %Pending{remaining: :unbuffered} = pending} = state) do
- case Nif.nif_read_async(state.stdout, -1) do
+ case Nif.nif_read(state.stdout, -1) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, []})
{:noreply, state}
{:ok, binary} ->
GenServer.reply(pending.client_pid, {:ok, binary})
{:noreply, state}
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: pending} = state) do
- case Nif.nif_read_async(state.stdout, pending.remaining) do
+ case Nif.nif_read(state.stdout, pending.remaining) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, pending.bin})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
if byte_size(binary) < pending.remaining do
pending = %Pending{
pending
| bin: [pending.bin | binary],
remaining: pending.remaining - byte_size(binary)
}
{:noreply, %Process{state | pending_read: pending}}
else
GenServer.reply(pending.client_pid, {:ok, [state.pending_read.bin | binary]})
{:noreply, %Process{state | pending_read: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | pending_read: %Pending{}, errno: errno}}
end
end
defp check_exit(state, from) do
case state.status do
# {:ok, {:exit, Nif.fork_exec_failure()}} ->
# GenServer.reply(from, {:error, :failed_to_execute})
# cancel_timer(state, from, :timeout)
# {:noreply, clear_await(state, from)}
{:exit, status} ->
GenServer.reply(from, {:ok, {:exit, status}})
cancel_timer(state, from, :timeout)
{:noreply, clear_await(state, from)}
:start ->
tref = Elixir.Process.send_after(self(), {:check_exit, from}, @exit_check_timeout)
{:noreply, put_timer(state, from, :check, tref)}
end
end
- defp do_kill(context, :sigkill), do: Nif.sys_kill(context)
-
- defp do_kill(context, :sigterm), do: Nif.sys_terminate(context)
-
defp do_close(state, type) do
fd =
if type == :stdin do
state.stdin
else
state.stdout
end
case Nif.nif_close(fd) do
:ok ->
{:reply, :ok, state}
{:error, errno} ->
raise errno
{:reply, {:error, errno}, %Process{state | errno: errno}}
end
end
defp clear_await(state, from) do
%Process{state | await: Map.delete(state.await, from)}
end
defp cancel_timer(state, from, key) do
case get_timer(state, from, key) do
nil -> :ok
tref -> Elixir.Process.cancel_timer(tref)
end
end
defp put_timer(state, from, key, timer) do
if Map.has_key?(state.await, from) do
await = put_in(state.await, [from, key], timer)
%Process{state | await: await}
else
%Process{state | await: %{from => %{key => timer}}}
end
end
defp get_timer(state, from, key), do: get_in(state.await, [from, key])
defp normalize_cmd(cmd) do
path = System.find_executable(cmd)
if path do
{:ok, to_charlist(path)}
else
{:error, "command not found: #{inspect(cmd)}"}
end
end
defp normalize_cmd_args(args) do
if is_list(args) do
{:ok, Enum.map(args, &to_charlist/1)}
else
{:error, "command arguments must be list of strings. #{inspect(args)}"}
end
end
defp normalize_cd(nil), do: {:ok, ''}
defp normalize_cd(cd) do
if File.exists?(cd) && File.dir?(cd) do
{:ok, to_charlist(cd)}
else
{:error, "`:cd` must be valid directory path"}
end
end
defp normalize_env(nil), do: {:ok, []}
defp normalize_env(env) do
user_env =
Map.new(env, fn {key, value} ->
{String.trim(key), String.trim(value)}
end)
# spawned process env will be beam env at that time + user env.
# this is similar to erlang behavior
env_list =
Map.merge(System.get_env(), user_env)
|> Enum.map(fn {k, v} ->
to_charlist(k <> "=" <> v)
end)
{:ok, env_list}
end
defp validate_opts_fields(opts) do
{_, additional_opts} = Keyword.split(opts, [:cd, :env])
if Enum.empty?(additional_opts) do
:ok
else
{:error, "invalid opts: #{inspect(additional_opts)}"}
end
end
defp normalize_args([cmd | args], opts) when is_list(opts) do
with {:ok, cmd} <- normalize_cmd(cmd),
{:ok, args} <- normalize_cmd_args(args),
:ok <- validate_opts_fields(opts),
{:ok, cd} <- normalize_cd(opts[:cd]),
{:ok, env} <- normalize_env(opts[:env]) do
{:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env}}
end
end
defp normalize_args(_, _), do: {:error, "invalid arguments"}
@spawner_path Path.expand("../../c_src/spawner", __DIR__) |> to_charlist()
defp exec(cmd_with_args, socket_path, env, cd) do
opts = []
# opts = if cd, do: [cd: cd], else: []
# opts = if env, do: [{:env, env} | opts], else: opts
opts =
[
:nouse_stdio,
:exit_status,
:binary,
args: [socket_path | cmd_with_args]
] ++ opts
Port.open({:spawn_executable, @spawner_path}, opts)
end
defp socket_path do
dir = System.tmp_dir!()
Path.join(dir, randome_string())
end
defp randome_string do
:crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
end
+ defp signal(port, sig) when sig in [:sigkill, :sigterm] do
+ case Port.info(port, :os_pid) do
+ {:os_pid, os_pid} ->
+ Nif.nif_kill(os_pid, sig)
+
+ :undefined ->
+ {:error, :process_not_alive}
+ end
+ end
+
@timeout 2000
defp receive_fds(uds) do
- case :socket.accept(uds, 5000) do
+ case :socket.accept(uds, @timeout) do
{:ok, usock} ->
{:ok, msg} = :socket.recvmsg(usock)
%{
ctrl: [
%{
data: <<read_fd_int::native-32, write_fd_int::native-32, _rest::binary>>,
level: :socket,
type: :rights
}
]
} = msg
:socket.close(usock)
{write_fd_int, read_fd_int}
error ->
raise error
end
end
end
diff --git a/lib/exile/process_nif.ex b/lib/exile/process_nif.ex
index b555ad4..2b67682 100644
--- a/lib/exile/process_nif.ex
+++ b/lib/exile/process_nif.ex
@@ -1,36 +1,30 @@
defmodule Exile.ProcessNif do
@moduledoc false
@on_load :load_nifs
def load_nifs do
nif_path = :filename.join(:code.priv_dir(:exile), "exile")
:erlang.load_nif(nif_path, 0)
end
- def sys_kill(_context), do: :erlang.nif_error(:nif_library_not_loaded)
+ def nif_is_os_pid_alive(_os_pid), do: :erlang.nif_error(:nif_library_not_loaded)
- def sys_terminate(_context), do: :erlang.nif_error(:nif_library_not_loaded)
+ def nif_kill(_os_pid, _signal), do: :erlang.nif_error(:nif_library_not_loaded)
- def sys_wait(_context), do: :erlang.nif_error(:nif_library_not_loaded)
-
- def os_pid(_context), do: :erlang.nif_error(:nif_library_not_loaded)
-
- def alive?(_context), do: :erlang.nif_error(:nif_library_not_loaded)
-
- def nif_read_async(_fd, _request), do: :erlang.nif_error(:nif_library_not_loaded)
+ def nif_read(_fd, _request), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_create_fd(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_close(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_write(_fd, _bin), do: :erlang.nif_error(:nif_library_not_loaded)
# non-nif helper functions
defmacro fork_exec_failure(), do: 125
defmacro nif_false(), do: 0
defmacro nif_true(), do: 1
def to_process_fd(:stdin), do: 0
def to_process_fd(:stdout), do: 1
end
diff --git a/lib/exile/watcher.ex b/lib/exile/watcher.ex
index b449685..7a1d548 100644
--- a/lib/exile/watcher.ex
+++ b/lib/exile/watcher.ex
@@ -1,87 +1,82 @@
defmodule Exile.Watcher do
use GenServer, restart: :temporary
require Logger
- alias Exile.ProcessNif
+ alias Exile.ProcessNif, as: Nif
def start_link(args) do
{:ok, _pid} = GenServer.start_link(__MODULE__, args)
end
- def watch(pid, context) do
- spec = {Exile.Watcher, %{pid: pid, context: context}}
+ def watch(pid, os_pid, socket_path) do
+ spec = {Exile.Watcher, %{pid: pid, os_pid: os_pid, socket_path: socket_path}}
DynamicSupervisor.start_child(Exile.WatcherSupervisor, spec)
end
def init(args) do
- %{pid: pid, socket_path: socket_path} = args
+ %{pid: pid, os_pid: os_pid, socket_path: socket_path} = args
Process.flag(:trap_exit, true)
ref = Elixir.Process.monitor(pid)
- {:ok, %{pid: pid, socket_path: socket_path, ref: ref}}
+ {:ok, %{pid: pid, os_pid: os_pid, socket_path: socket_path, ref: ref}}
end
def handle_info({:DOWN, ref, :process, pid, _reason}, %{
pid: pid,
socket_path: socket_path,
+ os_pid: os_pid,
ref: ref
}) do
File.rm!(socket_path)
- # attempt_graceful_exit(socket_path)
+ attempt_graceful_exit(os_pid)
{:stop, :normal, nil}
end
def handle_info({:EXIT, _, reason}, nil), do: {:stop, reason, nil}
# when watcher is attempted to be killed, we forcefully kill external os process.
# This can happen when beam receive SIGTERM
- def handle_info({:EXIT, _, reason}, %{pid: pid, socket_path: socket_path}) do
+ def handle_info({:EXIT, _, reason}, %{pid: pid, socket_path: socket_path, os_pid: os_pid}) do
Logger.debug(fn -> "Watcher exiting. reason: #{inspect(reason)}" end)
File.rm!(socket_path)
- # Exile.Process.stop(pid)
- # attempt_graceful_exit(socket_path)
+ Exile.Process.stop(pid)
+ attempt_graceful_exit(os_pid)
{:stop, reason, nil}
end
# for proper process exit parent of the child *must* wait() for
# child processes termination exit and "pickup" after the exit
# (receive child exit_status). Resources acquired by child such as
# file descriptors won't be released even if the child process
# itself is terminated.
- defp attempt_graceful_exit(socket_path) do
+ defp attempt_graceful_exit(os_pid) do
try do
Logger.debug(fn -> "Stopping external program" end)
- # sys_close is idempotent, calling it multiple times is okay
- ProcessNif.sys_close(socket_path, ProcessNif.to_process_fd(:stdin))
- ProcessNif.sys_close(socket_path, ProcessNif.to_process_fd(:stdout))
-
# at max we wait for 100ms for program to exit
- process_exit?(socket_path, 100) && throw(:done)
+ process_exit?(os_pid, 100) && throw(:done)
Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
- ProcessNif.sys_terminate(socket_path)
- process_exit?(socket_path, 100) && throw(:done)
+ Nif.nif_kill(os_pid, :sigterm)
+ process_exit?(os_pid, 100) && throw(:done)
Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
- ProcessNif.sys_kill(socket_path)
- process_exit?(socket_path, 1000) && throw(:done)
+ Nif.nif_kill(os_pid, :sigkill)
+ process_exit?(os_pid, 1000) && throw(:done)
Logger.error("[exile] failed to kill external process")
raise "Failed to kill external process"
catch
:done -> Logger.debug(fn -> "External program exited successfully" end)
end
end
- defp process_exit?(socket_path) do
- match?({:ok, _}, ProcessNif.sys_wait(socket_path))
- end
+ defp process_exit?(os_pid), do: !Nif.nif_is_os_pid_alive(os_pid)
- defp process_exit?(socket_path, timeout) do
- if process_exit?(socket_path) do
+ defp process_exit?(os_pid, timeout) do
+ if process_exit?(os_pid) do
true
else
:timer.sleep(timeout)
- process_exit?(socket_path)
+ process_exit?(os_pid)
end
end
end
diff --git a/lib/uds.ex b/lib/uds.ex
index 88c13a7..bd955f2 100644
--- a/lib/uds.ex
+++ b/lib/uds.ex
@@ -1,110 +1,110 @@
defmodule UDS do
alias Exile.ProcessNif, as: Nif
def main(path, args) do
{:ok, uds} = :socket.open(:local, :stream, :default)
_ = :file.delete(path)
{:ok, _} = :socket.bind(uds, %{family: :local, path: path})
:ok = :socket.listen(uds)
IO.puts("Listening UNIX socket #{path} for worker connection")
_port = exec(path, args)
case :socket.accept(uds, 2000) do
{:ok, usock} ->
echo_acc_loop(usock, path)
:socket.close(usock)
error ->
raise error
end
end
defp exec(path, args) do
spawner_path = Path.expand("../c_src/spawner", __DIR__)
Port.open({:spawn_executable, to_charlist(spawner_path)}, [
:nouse_stdio,
:exit_status,
:binary,
args: [path | args]
])
|> IO.inspect(label: :os_proc)
end
def echo_acc_loop(uds, _path) do
case :socket.recvmsg(uds) do
{:ok, msg} ->
handle_msg(msg)
# echo_acc_loop(uds, path)
err ->
IO.puts("Failed to recvmsg: #{inspect(err)}")
:socket.close(uds)
end
end
def handle_msg(%{
ctrl: [
%{
data: <<read_fd::native-32, write_fd::native-32, _rest::binary>>,
level: :socket,
type: :rights
}
]
}) do
IO.inspect(read_fd, label: :read_fd)
IO.inspect(write_fd, label: :write_fd)
# read(write_fd)
{:ok, fd} = Nif.nif_create_fd(write_fd)
nif_read(fd)
end
defp nif_read(fd) do
- case Nif.nif_read_async(fd, 65535) do
+ case Nif.nif_read(fd, 65535) do
{:ok, <<>>} ->
Nif.nif_close(fd)
IO.puts("\nEOF")
{:ok, bin} ->
IO.puts("READ: #{IO.iodata_length(bin)}")
nif_read(fd)
{:error, :eagain} ->
IO.puts(":eagain")
wait_for_ready(fd)
{:error, error} ->
IO.inspect(error)
end
end
defp wait_for_ready(fd) do
receive do
{:select, _read_resource, _ref, :ready_input} ->
nif_read(fd)
end
end
# defp read(fd) do
# port = Port.open({:fd, fd, fd}, [:in, :binary, :eof])
# do_read(port)
# end
# defp do_read(port) do
# receive do
# {^port, {:data, data}} ->
# IO.puts(IO.iodata_length(data))
# do_read(port)
# {^port, msg} ->
# IO.inspect(msg)
# msg ->
# IO.inspect(msg)
# do_read(port)
# end
# end
end

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 8:31 AM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
41001
Default Alt Text
(45 KB)

Event Timeline