Page MenuHomePhorge

No OneTemporary

Size
18 KB
Referenced Files
None
Subscribers
None
diff --git a/c_src/exile.c b/c_src/exile.c
index a3c65bf..647a731 100644
--- a/c_src/exile.c
+++ b/c_src/exile.c
@@ -1,392 +1,392 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "erl_nif.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#ifdef ERTS_DIRTY_SCHEDULERS
#define USE_DIRTY_IO ERL_NIF_DIRTY_JOB_IO_BOUND
#else
#define USE_DIRTY_IO 0
#endif
-//#define DEBUG
+// #define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
+ enif_fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, \
+ __func__); \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
-#define start_timing() ErlNifTime __start = enif_monotonic_time(ERL_NIF_USEC)
-#define elapsed_microseconds() (enif_monotonic_time(ERL_NIF_USEC) - __start)
#else
#define debug(...)
-#define start_timing()
-#define elapsed_microseconds() 0
#endif
#define error(...) \
do { \
+ enif_fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, \
+ __func__); \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
+#define assert_argc(argc, count) \
+ if (argc != count) { \
+ error("number of arguments must be %d", count); \
+ return enif_make_badarg(env); \
+ }
+
static const int UNBUFFERED_READ = -1;
static const int PIPE_BUF_SIZE = 65535;
static const int FD_CLOSED = -1;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_UNDEFINED;
static ERL_NIF_TERM ATOM_INVALID_FD;
static ERL_NIF_TERM ATOM_PIPE_CLOSED;
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ALLOC_FAILED;
static ERL_NIF_TERM ATOM_SIGKILL;
static ERL_NIF_TERM ATOM_SIGTERM;
/* command exit types */
static ERL_NIF_TERM ATOM_EXIT;
static ERL_NIF_TERM ATOM_SIGNALED;
static ERL_NIF_TERM ATOM_STOPPED;
enum exit_type { NORMAL_EXIT, SIGNALED, STOPPED };
static void close_fd(int *fd) {
if (*fd != FD_CLOSED) {
close(*fd);
*fd = FD_CLOSED;
}
}
static void io_resource_dtor(ErlNifEnv *env, void *obj) {
int *fd = (int *)obj;
close_fd(fd);
debug("Exile io_resource_dtor called");
}
static void io_resource_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
debug("Exile io_resource_stop called %d", fd);
}
static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
int *fd = (int *)obj;
close_fd(fd);
debug("Exile io_resource_down called");
}
static ErlNifResourceTypeInit io_rt_init = {io_resource_dtor, io_resource_stop,
io_resource_down};
static ErlNifResourceType *FD_RT;
static inline ERL_NIF_TERM make_ok(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_OK, term);
}
static inline ERL_NIF_TERM make_error(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_ERROR, term);
}
/* time is assumed to be in microseconds */
static void notify_consumed_timeslice(ErlNifEnv *env, ErlNifTime start,
ErlNifTime stop) {
ErlNifTime pct;
pct = (ErlNifTime)((stop - start) / 10);
if (pct > 100)
pct = 100;
else if (pct == 0)
pct = 1;
enif_consume_timeslice(env, pct);
}
static int select_write(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_WRITE, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_write()");
return ret;
}
static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
- if (argc != 2)
- return enif_make_badarg(env);
+ assert_argc(argc, 2);
ErlNifTime start;
ssize_t size;
ErlNifBinary bin;
int write_errno;
int *fd;
start = enif_monotonic_time(ERL_NIF_USEC);
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (enif_inspect_binary(env, argv[1], &bin) != true)
return enif_make_badarg(env);
if (bin.size == 0)
return enif_make_badarg(env);
/* should we limit the bin.size here? */
size = write(*fd, bin.data, bin.size);
write_errno = errno;
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (size >= (ssize_t)bin.size) { // request completely satisfied
return make_ok(env, enif_make_int(env, size));
} else if (size >= 0) { // request partially satisfied
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, enif_make_int(env, size));
} else if (write_errno == EAGAIN || write_errno == EWOULDBLOCK) { // busy
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else { // Error
perror("write()");
return make_error(env, enif_make_int(env, write_errno));
}
}
static int select_read(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_READ, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_read()");
return ret;
}
static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
- if (argc != 1)
- return enif_make_badarg(env);
+ assert_argc(argc, 1);
ERL_NIF_TERM term;
int *fd;
fd = enif_alloc_resource(FD_RT, sizeof(int));
if (!enif_get_int(env, argv[0], fd))
goto error_exit;
term = enif_make_resource(env, fd);
enif_release_resource(fd);
return make_ok(env, term);
error_exit:
enif_release_resource(fd);
return ATOM_ERROR;
}
static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
- if (argc != 2)
- return enif_make_badarg(env);
+ assert_argc(argc, 2);
ErlNifTime start;
int size, demand;
int *fd;
start = enif_monotonic_time(ERL_NIF_USEC);
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (!enif_get_int(env, argv[1], &demand))
return enif_make_badarg(env);
size = demand;
if (demand == UNBUFFERED_READ) {
size = PIPE_BUF_SIZE;
} else if (demand < 1) {
return enif_make_badarg(env);
} else if (demand > PIPE_BUF_SIZE) {
size = PIPE_BUF_SIZE;
}
unsigned char buf[size];
ssize_t result = read(*fd, buf, size);
int read_errno = errno;
ERL_NIF_TERM bin_term = 0;
if (result >= 0) {
/* no need to release this binary */
unsigned char *temp = enif_make_new_binary(env, result, &bin_term);
memcpy(temp, buf, result);
}
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (result >= 0) {
/* we do not 'select' if demand completely satisfied OR EOF OR its
* UNBUFFERED_READ */
if (result == demand || result == 0 || demand == UNBUFFERED_READ) {
return make_ok(env, bin_term);
} else { // demand partially satisfied
int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, bin_term);
}
} else {
if (read_errno == EAGAIN || read_errno == EWOULDBLOCK) { // busy
int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
} else { // Error
perror("read()");
return make_error(env, enif_make_int(env, read_errno));
}
}
}
static ERL_NIF_TERM nif_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
- if (argc != 1)
- return enif_make_badarg(env);
+ assert_argc(argc, 1);
int *fd;
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
close_fd(fd);
return ATOM_OK;
}
static ERL_NIF_TERM nif_is_os_pid_alive(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
- pid_t pid;
+ assert_argc(argc, 1);
- if (argc != 1)
- return enif_make_badarg(env);
+ pid_t pid;
// we should not assume pid type to be `int`?
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
int result = kill(pid, 0);
if (result == 0)
return ATOM_TRUE;
else
return ATOM_FALSE;
}
static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
+ assert_argc(argc, 2);
+
pid_t pid;
int ret;
- if (argc != 2)
- return enif_make_badarg(env);
-
// we should not assume pid type to be `int`?
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
if (enif_compare(argv[1], ATOM_SIGKILL) == 0)
ret = kill(pid, SIGKILL);
else if (enif_compare(argv[1], ATOM_SIGTERM) == 0)
ret = kill(pid, SIGTERM);
else
return enif_make_badarg(env);
if (ret != 0) {
perror("[exile] failed to send signal");
return make_error(
env, enif_make_string(env, "failed to send signal", ERL_NIF_LATIN1));
}
return ATOM_OK;
}
/* static ERL_NIF_TERM make_exit_term(ErlNifEnv *env, ExecContext *ctx) { */
/* switch (ctx->exit_type) { */
/* case NORMAL_EXIT: */
/* return make_ok(env, enif_make_tuple2(env, ATOM_EXIT, */
/* enif_make_int(env,
* ctx->exit_status))); */
/* case SIGNALED: */
/* /\* exit_status here points to signal number *\/ */
/* return make_ok(env, enif_make_tuple2(env, ATOM_SIGNALED, */
/* enif_make_int(env,
* ctx->exit_status))); */
/* case STOPPED: */
/* return make_ok(env, enif_make_tuple2(env, ATOM_STOPPED, */
/* enif_make_int(env,
* ctx->exit_status))); */
/* default: */
/* error("Invalid wait status"); */
/* return make_error(env, ATOM_UNDEFINED); */
/* } */
/* } */
static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
FD_RT =
enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_FALSE = enif_make_atom(env, "false");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
ATOM_INVALID_FD = enif_make_atom(env, "invalid_fd_resource");
ATOM_PIPE_CLOSED = enif_make_atom(env, "closed_pipe");
ATOM_EXIT = enif_make_atom(env, "exit");
ATOM_SIGNALED = enif_make_atom(env, "signaled");
ATOM_STOPPED = enif_make_atom(env, "stopped");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ALLOC_FAILED = enif_make_atom(env, "alloc_failed");
ATOM_SIGTERM = enif_make_atom(env, "sigterm");
ATOM_SIGKILL = enif_make_atom(env, "sigkill");
return 0;
}
static void on_unload(ErlNifEnv *env, void *priv) {
debug("exile unload");
enif_free(priv);
}
static ErlNifFunc nif_funcs[] = {
{"nif_read", 2, nif_read, USE_DIRTY_IO},
{"nif_create_fd", 1, nif_create_fd, USE_DIRTY_IO},
{"nif_write", 2, nif_write, USE_DIRTY_IO},
{"nif_close", 1, nif_close, USE_DIRTY_IO},
{"nif_is_os_pid_alive", 1, nif_is_os_pid_alive, USE_DIRTY_IO},
{"nif_kill", 2, nif_kill, USE_DIRTY_IO}};
ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
diff --git a/c_src/spawner.c b/c_src/spawner.c
index 58df12b..ce42775 100644
--- a/c_src/spawner.c
+++ b/c_src/spawner.c
@@ -1,188 +1,206 @@
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <unistd.h>
+// #define DEBUG
+
+#ifdef DEBUG
+#define debug(...) \
+ do { \
+ fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, __func__); \
+ fprintf(stderr, __VA_ARGS__); \
+ } while (0)
+#else
+#define debug(...)
+#endif
+
+#define error(...) \
+ do { \
+ fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, __func__); \
+ fprintf(stderr, __VA_ARGS__); \
+ } while (0)
+
static const int PIPE_READ = 0;
static const int PIPE_WRITE = 1;
/* We are choosing an exit code which is not reserved see:
* https://www.tldp.org/LDP/abs/html/exitcodes.html. */
static const int FORK_EXEC_FAILURE = 125;
static int set_flag(int fd, int flags) {
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
}
static int send_io_fds(int socket, int read_fd, int write_fd) {
struct msghdr msg = {0};
struct cmsghdr *cmsg;
int fds[2];
char buf[CMSG_SPACE(2 * sizeof(int))], dup[256];
memset(buf, '\0', sizeof(buf));
struct iovec io = {.iov_base = &dup, .iov_len = sizeof(dup)};
msg.msg_iov = &io;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(2 * sizeof(int));
fds[0] = read_fd;
fds[1] = write_fd;
memcpy((int *)CMSG_DATA(cmsg), fds, 2 * sizeof(int));
if (sendmsg(socket, &msg, 0) < 0) {
- printf("Failed to send message");
+ debug("Failed to send message");
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
/* This is not ideal, but as of now there is no portable way to do this */
/*
static void close_all_fds() {
int fd_limit = (int)sysconf(_SC_OPEN_MAX);
for (int i = STDERR_FILENO + 1; i < fd_limit; i++)
close(i);
}
*/
static void close_all(int pipes[2][2]) {
for (int i = 0; i < 2; i++) {
if (pipes[i][PIPE_READ] > 0)
close(pipes[i][PIPE_READ]);
if (pipes[i][PIPE_WRITE] > 0)
close(pipes[i][PIPE_WRITE]);
}
}
static int exec_process(char const *bin, char *const *args, int socket) {
int pipes[2][2] = {{0, 0}, {0, 0}};
if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1) {
perror("[spawner] failed to create pipes");
close_all(pipes);
return 1;
}
- printf("[spawner] pipe: done\r\n");
+ debug("[spawner] pipe: done\r\n");
const int r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
const int w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
const int r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
const int w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0) {
perror("[spawner] failed to set flags for pipes");
close_all(pipes);
return 1;
}
- printf("[spawner] set_flag done\r\n");
+ debug("[spawner] set_flag done\r\n");
if (send_io_fds(socket, w_cmdin, r_cmdout) != EXIT_SUCCESS) {
perror("[spawner] failed to send fd via socket");
close_all(pipes);
return 1;
}
- printf("[spawner] send_io_fds: done\r\n");
+ debug("[spawner] send_io_fds: done\r\n");
// TODO: close all fds including socket
// close_all_fds();
- printf("[spawner] w_cmdin: %d r_cmdout: %d\r\n", w_cmdin, w_cmdout);
- printf("[spawner] argv: %s\r\n", args[1]);
+ debug("[spawner] w_cmdin: %d r_cmdout: %d\r\n", w_cmdin, w_cmdout);
+ debug("[spawner] argv: %s\r\n", args[1]);
close(w_cmdin);
close(r_cmdout);
close(STDIN_FILENO);
if (dup2(r_cmdin, STDIN_FILENO) < 0) {
perror("[spawner] failed to dup to stdin");
_exit(FORK_EXEC_FAILURE);
}
close(STDOUT_FILENO);
if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
perror("[spawner] failed to dup to stdout");
_exit(FORK_EXEC_FAILURE);
}
execvp(bin, args);
perror("[spawner] execvp(): failed");
_exit(FORK_EXEC_FAILURE);
}
static int handle(const char *sock_path, const char *bin, char *const *args) {
int sfd;
struct sockaddr_un addr;
- printf("[spawner] hadle\r\n");
+ debug("[spawner] hadle\r\n");
sfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sfd == -1) {
- printf("Failed to create socket");
+ debug("Failed to create socket");
return EXIT_FAILURE;
}
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, sock_path, sizeof(addr.sun_path) - 1);
- printf("[spawner] connect\r\n");
+ debug("[spawner] connect\r\n");
if (connect(sfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) ==
-1) {
- printf("Failed to connect to socket");
+ debug("Failed to connect to socket");
return EXIT_FAILURE;
}
- printf("[spawner] exec_process\r\n");
+ debug("[spawner] exec_process\r\n");
if (exec_process(bin, args, sfd) != 0)
return -1;
// we never reach here
return 0;
}
int main(int argc, const char *argv[]) {
int status = EXIT_SUCCESS;
const char **proc_argv;
- printf("[spawner] argc: %d\r\n", argc);
+ debug("[spawner] argc: %d\r\n", argc);
if (argc < 3) {
- printf("[spawner] exit\r\n");
+ debug("[spawner] exit\r\n");
status = EXIT_FAILURE;
} else {
proc_argv = malloc((argc - 2 + 1) * sizeof(char *));
int i;
for (i = 2; i < argc; i++)
proc_argv[i - 2] = argv[i];
proc_argv[i - 2] = NULL;
- printf("[spawner] exec: %s\r\n", argv[2]);
+ debug("[spawner] exec: %s\r\n", argv[2]);
status = handle(argv[1], argv[2], (char *const *)proc_argv);
}
exit(status);
}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 8:37 AM (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
41007
Default Alt Text
(18 KB)

Event Timeline