Page MenuHomePhorge

No OneTemporary

Size
96 KB
Referenced Files
None
Subscribers
None
diff --git a/.github/workflows/elixir.yaml b/.github/workflows/elixir.yaml
index 82c60db..d56cce1 100644
--- a/.github/workflows/elixir.yaml
+++ b/.github/workflows/elixir.yaml
@@ -1,30 +1,33 @@
name: Elixir CI
on: push
jobs:
test:
runs-on: ubuntu-latest
name: OTP ${{matrix.otp}} / Elixir ${{matrix.elixir}}
strategy:
matrix:
- otp: [21.0, 22.2]
- elixir: [1.7.0, 1.10.1]
+ include:
+ - elixir: 1.9.4
+ otp: 22.2
+ - elixir: 1.10.4
+ otp: 23.0
steps:
- uses: actions/checkout@v2
- uses: actions/setup-elixir@v1
with:
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}
- name: Install Dependencies
run: mix deps.get
- name: gcc version
run: gcc --version
- name: Compile
run: mix compile --force --warnings-as-errors
- name: Check format
run: mix format --check-formatted
# - name: Run credo
# run: mix credo
- name: Run Tests
run: mix test --exclude skip:true
diff --git a/.gitignore b/.gitignore
index 7066211..a9fb488 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,25 +1,30 @@
# The directory Mix will write compiled artifacts to.
/_build/
# If you run "mix test --cover", coverage assets end up here.
/cover/
# The directory Mix downloads your dependencies sources to.
/deps/
# Where 3rd-party dependencies like ExDoc output generated docs.
/doc/
# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch
# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump
# Also ignore archive artifacts (built via "mix archive.build").
*.ez
# Ignore package tarball (built via "mix hex.build").
exile-*.tar
-.dir-locals.el
\ No newline at end of file
+.dir-locals.el
+
+# Bench
+/bench/_build/
+/bench/deps/
+/bench/output/
diff --git a/Makefile b/Makefile
index c037faf..a973c8c 100644
--- a/Makefile
+++ b/Makefile
@@ -1,24 +1,28 @@
calling_from_make:
mix compile
UNAME := $(shell uname)
CFLAGS ?= -Wall -Werror -Wno-unused-parameter -pedantic -std=c99 -O2
ifeq ($(UNAME), Darwin)
TARGET_CFLAGS ?= -fPIC -undefined dynamic_lookup -dynamiclib -Wextra
endif
ifeq ($(UNAME), Linux)
TARGET_CFLAGS ?= -fPIC -shared
endif
-all: priv/exile.so
+all: priv/exile.so priv/spawner
priv/exile.so: c_src/exile.c
mkdir -p priv
$(CC) -I$(ERL_INTERFACE_INCLUDE_DIR) $(TARGET_CFLAGS) $(CFLAGS) c_src/exile.c -o priv/exile.so
+priv/spawner: c_src/spawner.c
+ mkdir -p priv
+ $(CC) $(CFLAGS) c_src/spawner.c -o priv/spawner
+
clean:
- @rm -rf priv/exile.so
+ @rm -rf priv/exile.so priv/spawner
diff --git a/README.md b/README.md
index 661b67b..88ab046 100644
--- a/README.md
+++ b/README.md
@@ -1,75 +1,75 @@
# Exile
-Exile is an alternative to beam [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix issues with ports.
+Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix ports issues.
+
+Exile is built around the idea of having demand-driven, asynchronous interaction with external process. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
-Exile is built around the idea of having demand-driven, asynchronous interaction with external command. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
``` elixir
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
`Exile.stream!` is a convenience wrapper around `Exile.Process`. If you want more control over stdin, stdout, and os process use `Exile.Process` directly.
**Note: Exile is experimental and it is still work-in-progress. Exile is based on NIF, please know the implications of it before using it**
## Rationale
Approaches, and issues
#### Port
Port is the default way of executing external commands. This is okay when you have control over the external program's implementation and the interaction is minimal. Port has several important issues.
* it can end up creating [zombie process](https://hexdocs.pm/elixir/Port.html#module-zombie-operating-system-processes)
* cannot selectively close stdin. This is required when the external programs act on EOF from stdin
* it sends command output as a message to the beam process. This does not put back pressure on the external program and leads exhausting VM memory
-#### Port based solutions
+#### Middleware based solutions
-There are many port based libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. These solve the first two issues associated with ports: zombie process and selectively closing STDIN. But not the third issue: having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
+Libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. solves the first two issues associated with ports - zombie process and selectively closing STDIN. But not the third issue - having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
* no back-pressure
* additional os process (middleware) for every execution of your program
* in few cases such as porcelain user has to install this external program explicitly
* might not be suitable when the program requires constant communication between beam process and external program
-Note that Unlike Exile, bugs in port based implementation does not bring down beam VM.
+On the plus side, unlike Exile, bugs in the implementation does not bring down whole beam VM.
#### [ExCmd](https://github.com/akash-akya/ex_cmd)
This is my other stab at solving back pressure on the external program issue. It implements a demand-driven protocol using [odu](https://github.com/akash-akya/odu) to solve this. Since ExCmd is also a port based solution, concerns previously mentioned applies to ExCmd too.
## Exile
-Exile does non-blocking, asynchronous io system calls using NIF. Since it makes non-blocking system calls, schedulers are never blocked indefinitely. It does this in asynchronous fashion using `enif_select` so its efficient.
+Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication instead does raw stdio using NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Makes use of dirty-schedulers for IO.
-**Advantages over other approaches:**
+**Highlights**
-* solves all three issues associated with port
-* it does not use any middleware
+* Back pressure
+* it does not use any middleware program
* no additional os process. no performance/resource cost
* no need to install any external command
-* can run many external programs in parallel without adversely affecting schedulers
-* stream abstraction for interacting with the external program
-* should be portable across POSIX compliant operating systems (not tested)
+* tries to handle zombie process by attempting to cleanup external process. *But* as there is no middleware involved with exile so it is still possbile to endup with zombie process
+* stream abstraction
If you are running executing huge number of external programs **concurrently** (more than few hundred) you might have to increase open file descriptors limit (`ulimit -n`)
Non-blocking io can be used for other interesting things. Such as reading named pipe (FIFO) files. `Exile.stream!(~w(cat data.pipe))` does not block schedulers so you can open hundreds of fifo files unlike default `file` based io.
-##### TODO
-* add benchmarks results
+#### TODO
+* add benchmarks results
### 🚨 Obligatory NIF warning
-As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls, spawned external processes are still completely isolated at OS level and the port issues it tries to solve are critical.
+As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls. Also, spawned external processes are still completely isolated at OS level.
If all you want is to run a command with no communication, then just sticking with `System.cmd` is a better option.
### License
Copyright (c) 2020 Akash Hiremath.
Exile source code is released under Apache License 2.0. Check [LICENSE](LICENSE.md) for more information.
diff --git a/bench/io.exs b/bench/io.exs
new file mode 100644
index 0000000..24b1a13
--- /dev/null
+++ b/bench/io.exs
@@ -0,0 +1,88 @@
+defmodule ExileBench.Read do
+ @size 1000
+ @chunk_size 1024 * 4
+ @data :crypto.strong_rand_bytes(@chunk_size)
+
+ defp exile_read_loop(s, @size), do: Exile.Process.close_stdin(s)
+
+ defp exile_read_loop(s, count) do
+ :ok = Exile.Process.write(s, @data)
+
+ case Exile.Process.read(s, @chunk_size) do
+ {:ok, _data} -> exile_read_loop(s, count + 1)
+ {:eof, _} -> :ok
+ end
+ end
+
+ def exile do
+ {:ok, s} = Exile.Process.start_link(["cat"])
+ :ok = exile_read_loop(s, 0)
+ Exile.Process.stop(s)
+ end
+
+ # port
+ defp port_read(_port, 0), do: :ok
+ defp port_read(port, size) do
+ receive do
+ {^port, {:data, data}} -> port_read(port, size - IO.iodata_length(data))
+ end
+ end
+
+ defp port_loop(_port, @size), do: :ok
+
+ defp port_loop(port, count) do
+ true = Port.command(port, @data)
+ :ok = port_read(port, @chunk_size)
+ port_loop(port, count + 1)
+ end
+
+ def port do
+ cat = :os.find_executable('cat')
+
+ port =
+ Port.open({:spawn_executable, cat}, [
+ :use_stdio,
+ :exit_status,
+ :binary
+ ])
+
+ port_loop(port, 0)
+ Port.close(port)
+ end
+
+
+ # ex_cmd
+ defp ex_cmd_read_loop(s, @size), do: ExCmd.Process.close_stdin(s)
+
+ defp ex_cmd_read_loop(s, count) do
+ :ok = ExCmd.Process.write(s, @data)
+
+ case ExCmd.Process.read(s, @chunk_size) do
+ {:ok, _data} -> ex_cmd_read_loop(s, count + 1)
+ :eof -> :ok
+ end
+ end
+
+ def ex_cmd do
+ {:ok, s} = ExCmd.Process.start_link(["cat"])
+ :ok = ex_cmd_read_loop(s, 0)
+ ExCmd.Process.stop(s)
+ end
+end
+
+read_jobs = %{
+ "Exile" => fn -> ExileBench.Read.exile() end,
+ "Port" => fn -> ExileBench.Read.port() end,
+ "ExCmd" => fn -> ExileBench.Read.ex_cmd() end
+}
+
+Benchee.run(read_jobs,
+ # parallel: 4,
+ warmup: 5,
+ time: 30,
+ memory_time: 1,
+ formatters: [
+ {Benchee.Formatters.HTML, file: Path.expand("output/read.html", __DIR__)},
+ Benchee.Formatters.Console
+ ]
+)
diff --git a/bench/mix.exs b/bench/mix.exs
new file mode 100644
index 0000000..5b4980a
--- /dev/null
+++ b/bench/mix.exs
@@ -0,0 +1,30 @@
+defmodule ExileBench.MixProject do
+ use Mix.Project
+
+ def project do
+ [
+ app: :exile_bench,
+ version: "0.1.0",
+ elixir: "~> 1.7",
+ start_permanent: Mix.env() == :prod,
+ deps: deps(),
+ aliases: aliases()
+ ]
+ end
+
+ defp aliases() do
+ [
+ "bench.io": ["run io.exs"]
+ ]
+ end
+
+ # Run "mix help deps" to learn about dependencies.
+ defp deps do
+ [
+ {:benchee, "~> 1.0"},
+ {:benchee_html, "~> 1.0"},
+ {:exile, "~> 0.1", path: "../", override: true},
+ {:ex_cmd, "~> 0.4.1"}
+ ]
+ end
+end
diff --git a/bench/mix.lock b/bench/mix.lock
new file mode 100644
index 0000000..0d1e6a1
--- /dev/null
+++ b/bench/mix.lock
@@ -0,0 +1,10 @@
+%{
+ "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
+ "benchee_html": {:hex, :benchee_html, "1.0.0", "5b4d24effebd060f466fb460ec06576e7b34a00fc26b234fe4f12c4f05c95947", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:benchee_json, "~> 1.0", [hex: :benchee_json, repo: "hexpm", optional: false]}], "hexpm", "5280af9aac432ff5ca4216d03e8a93f32209510e925b60e7f27c33796f69e699"},
+ "benchee_json": {:hex, :benchee_json, "1.0.0", "cc661f4454d5995c08fe10dd1f2f72f229c8f0fb1c96f6b327a8c8fc96a91fe5", [:mix], [{:benchee, ">= 0.99.0 and < 2.0.0", [hex: :benchee, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "da05d813f9123505f870344d68fb7c86a4f0f9074df7d7b7e2bb011a63ec231c"},
+ "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
+ "elixir_make": {:hex, :elixir_make, "0.6.2", "7dffacd77dec4c37b39af867cedaabb0b59f6a871f89722c25b28fcd4bd70530", [:mix], [], "hexpm", "03e49eadda22526a7e5279d53321d1cced6552f344ba4e03e619063de75348d9"},
+ "ex_cmd": {:hex, :ex_cmd, "0.4.1", "e7d194abdd8c98fd936d760d2a72e42075398a915902bcec74281927b5321bd5", [:mix], [{:gen_state_machine, "~> 2.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}], "hexpm", "d58dde62094419351c79d3215325300128f6895c643792d501adcd5cf507ef13"},
+ "gen_state_machine": {:hex, :gen_state_machine, "2.1.0", "a38b0e53fad812d29ec149f0d354da5d1bc0d7222c3711f3a0bd5aa608b42992", [:mix], [], "hexpm", "ae367038808db25cee2f2c4b8d0531522ea587c4995eb6f96ee73410a60fa06b"},
+ "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
+}
diff --git a/c_src/exile.c b/c_src/exile.c
index 9b7e529..fd1fa65 100644
--- a/c_src/exile.c
+++ b/c_src/exile.c
@@ -1,776 +1,391 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "erl_nif.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#ifdef ERTS_DIRTY_SCHEDULERS
#define USE_DIRTY_IO ERL_NIF_DIRTY_JOB_IO_BOUND
#else
#define USE_DIRTY_IO 0
#endif
-//#define DEBUG
+// #define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
+ enif_fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, \
+ __func__); \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
-#define start_timing() ErlNifTime __start = enif_monotonic_time(ERL_NIF_USEC)
-#define elapsed_microseconds() (enif_monotonic_time(ERL_NIF_USEC) - __start)
#else
#define debug(...)
-#define start_timing()
-#define elapsed_microseconds() 0
#endif
#define error(...) \
do { \
+ enif_fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, \
+ __func__); \
enif_fprintf(stderr, __VA_ARGS__); \
enif_fprintf(stderr, "\n"); \
} while (0)
-#define GET_CTX(env, arg, ctx) \
- do { \
- ExilePriv *data = enif_priv_data(env); \
- if (enif_get_resource(env, arg, data->exec_ctx_rt, (void **)&ctx) == \
- false) { \
- return make_error(env, ATOM_INVALID_CTX); \
- } \
- } while (0);
-
-static const int PIPE_READ = 0;
-static const int PIPE_WRITE = 1;
-static const int PIPE_CLOSED = -1;
-static const int CMD_EXIT = -1;
+#define assert_argc(argc, count) \
+ if (argc != count) { \
+ error("number of arguments must be %d", count); \
+ return enif_make_badarg(env); \
+ }
+
static const int UNBUFFERED_READ = -1;
static const int PIPE_BUF_SIZE = 65535;
-/* We are choosing an exit code which is not reserved see:
- * https://www.tldp.org/LDP/abs/html/exitcodes.html. */
-static const int FORK_EXEC_FAILURE = 125;
+static const int FD_CLOSED = -1;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_UNDEFINED;
-static ERL_NIF_TERM ATOM_INVALID_CTX;
-static ERL_NIF_TERM ATOM_PIPE_CLOSED;
+static ERL_NIF_TERM ATOM_INVALID_FD;
+static ERL_NIF_TERM ATOM_SELECT_CANCEL_ERROR;
static ERL_NIF_TERM ATOM_EAGAIN;
-static ERL_NIF_TERM ATOM_ALLOC_FAILED;
-
-/* command exit types */
-static ERL_NIF_TERM ATOM_EXIT;
-static ERL_NIF_TERM ATOM_SIGNALED;
-static ERL_NIF_TERM ATOM_STOPPED;
-
-enum exec_status {
- SUCCESS,
- PIPE_CREATE_ERROR,
- PIPE_FLAG_ERROR,
- FORK_ERROR,
- PIPE_DUP_ERROR,
- NULL_DEV_OPEN_ERROR,
-};
-
-enum exit_type { NORMAL_EXIT, SIGNALED, STOPPED };
-
-typedef struct ExilePriv {
- ErlNifResourceType *exec_ctx_rt;
- ErlNifResourceType *io_rt;
-} ExilePriv;
-
-typedef struct ExecContext {
- int cmd_input_fd;
- int cmd_output_fd;
- int exit_status; // can be exit status or signal number depending on exit_type
- enum exit_type exit_type;
- pid_t pid;
- // these are to hold enif_select resource objects
- int *read_resource;
- int *write_resource;
-} ExecContext;
-
-typedef struct StartProcessResult {
- bool success;
- int err;
- ExecContext context;
-} StartProcessResult;
-
-/* TODO: assert if the external process is exit (?) */
-static void exec_ctx_dtor(ErlNifEnv *env, void *obj) {
- ExecContext *ctx = obj;
- enif_release_resource(ctx->read_resource);
- enif_release_resource(ctx->write_resource);
- debug("Exile exec_ctx_dtor called");
-}
-static void exec_ctx_stop(ErlNifEnv *env, void *obj, int fd,
- int is_direct_call) {
- debug("Exile exec_ctx_stop called");
-}
+static ERL_NIF_TERM ATOM_SIGKILL;
+static ERL_NIF_TERM ATOM_SIGTERM;
-static void exec_ctx_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
- ErlNifMonitor *monitor) {
- debug("Exile exec_ctx_down called");
+static void close_fd(int *fd) {
+ if (*fd != FD_CLOSED) {
+ close(*fd);
+ *fd = FD_CLOSED;
+ }
}
-static ErlNifResourceTypeInit exec_ctx_rt_init = {exec_ctx_dtor, exec_ctx_stop,
- exec_ctx_down};
+static int cancel_select(ErlNifEnv *env, int *fd) {
+ int ret;
+
+ if (*fd != FD_CLOSED) {
+ ret = enif_select(env, *fd, ERL_NIF_SELECT_STOP, fd, NULL, ATOM_UNDEFINED);
+ if (ret < 0)
+ perror("cancel_select()");
+
+ return ret;
+ }
+
+ return 0;
+}
static void io_resource_dtor(ErlNifEnv *env, void *obj) {
debug("Exile io_resource_dtor called");
}
static void io_resource_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
+ close_fd(&fd);
debug("Exile io_resource_stop called %d", fd);
}
static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
+ int *fd = (int *)obj;
+ cancel_select(env, fd);
debug("Exile io_resource_down called");
}
static ErlNifResourceTypeInit io_rt_init = {io_resource_dtor, io_resource_stop,
io_resource_down};
-static void free_array_of_cstr(char ***arr) {
- int i;
-
- if (*arr) {
- for(i=0; (*arr)[i] != NULL; i++) {
- free((*arr)[i]);
- (*arr)[i] = NULL;
- }
-
- free(*arr);
- }
- *arr = NULL;
-}
-
-static bool read_string(ErlNifEnv *env, ERL_NIF_TERM term, char **str) {
- unsigned int length;
- *str = NULL;
-
- if (enif_get_list_length(env, term, &length) != true) {
- error("failed to get string length");
- return false;
- }
-
- *str = (char *)malloc((length + 1) * sizeof(char));
-
- if (enif_get_string(env, term, *str, length + 1, ERL_NIF_LATIN1) < 1) {
- error("failed to get string");
-
- free(str);
- *str = NULL;
-
- return false;
- }
-
- return true;
-}
-
-static bool erl_list_to_array_of_cstr(ErlNifEnv *env, ERL_NIF_TERM list,
- char ***arr) {
- ERL_NIF_TERM head, tail;
- unsigned int length, i;
-
- *arr = NULL;
-
- if (enif_is_list(env, list) != true) {
- error("erl term is not a list");
- goto error;
- }
-
- if (enif_get_list_length(env, list, &length) != true) {
- error("erl term is not a proper list");
- goto error;
- }
-
- *arr = (char **)malloc((length + 1) * sizeof(char *));
-
- for (i = 0; i < length + 1; i++)
- (*arr)[i] = NULL;
-
- tail = list;
-
- for (i = 0; i < length; i++) {
- if (enif_get_list_cell(env, tail, &head, &tail) != true) {
- error("failed to get cell from list");
- goto error;
- }
-
- if (read_string(env, head, &(*arr)[i]) != true)
- goto error;
- }
-
- return true;
-
-error:
- free_array_of_cstr(arr);
- return false;
-}
+static ErlNifResourceType *FD_RT;
static inline ERL_NIF_TERM make_ok(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_OK, term);
}
static inline ERL_NIF_TERM make_error(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_ERROR, term);
}
-static int set_flag(int fd, int flags) {
- return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
-}
-
-static void close_all(int pipes[2][2]) {
- for (int i = 0; i < 2; i++) {
- if (pipes[i][PIPE_READ] > 0)
- close(pipes[i][PIPE_READ]);
- if (pipes[i][PIPE_WRITE] > 0)
- close(pipes[i][PIPE_WRITE]);
- }
-}
-
/* time is assumed to be in microseconds */
static void notify_consumed_timeslice(ErlNifEnv *env, ErlNifTime start,
ErlNifTime stop) {
ErlNifTime pct;
pct = (ErlNifTime)((stop - start) / 10);
if (pct > 100)
pct = 100;
else if (pct == 0)
pct = 1;
enif_consume_timeslice(env, pct);
}
-/* This is not ideal, but as of now there is no portable way to do this */
-static void close_all_fds() {
- int fd_limit = (int)sysconf(_SC_OPEN_MAX);
- for (int i = STDERR_FILENO + 1; i < fd_limit; i++)
- close(i);
-}
-
-static StartProcessResult start_process(char **args, bool stderr_to_console,
- char *dir, char *const exec_env[]) {
- StartProcessResult result = {.success = false};
- pid_t pid;
- int pipes[2][2] = {{0, 0}, {0, 0}};
-
- if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1) {
- result.err = errno;
- perror("[exile] failed to create pipes");
- close_all(pipes);
- return result;
- }
-
- const int r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
- const int w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
-
- const int r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
- const int w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
-
- if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
- set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
- set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0) {
- result.err = errno;
- perror("[exile] failed to set flags for pipes");
- close_all(pipes);
- return result;
- }
-
- switch (pid = fork()) {
-
- case -1:
- result.err = errno;
- perror("[exile] failed to fork");
- close_all(pipes);
- return result;
-
- case 0: // child
-
- if (dir[0] && chdir(dir) != 0) {
- perror("[exile] failed to change directory");
- _exit(FORK_EXEC_FAILURE);
- }
-
- close(STDIN_FILENO);
- close(STDOUT_FILENO);
-
- if (dup2(r_cmdin, STDIN_FILENO) < 0) {
- perror("[exile] failed to dup to stdin");
-
- /* We are assuming FORK_EXEC_FAILURE exit code wont be used by the command
- * we are running. Technically we can not assume any exit code here. The
- * parent can not differentiate between exit before `exec` and the normal
- * command exit.
- * One correct way to solve this might be to have a separate
- * pipe shared between child and parent and signaling the parent by
- * closing it or writing to it. */
- _exit(FORK_EXEC_FAILURE);
- }
- if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
- perror("[exile] failed to dup to stdout");
- _exit(FORK_EXEC_FAILURE);
- }
-
- if (stderr_to_console != true) {
- close(STDERR_FILENO);
- int dev_null = open("/dev/null", O_WRONLY);
-
- if (dev_null == -1) {
- perror("[exile] failed to open /dev/null");
- _exit(FORK_EXEC_FAILURE);
- }
-
- if (dup2(dev_null, STDERR_FILENO) < 0) {
- perror("[exile] failed to dup stderr");
- _exit(FORK_EXEC_FAILURE);
- }
-
- close(dev_null);
- }
-
- close_all_fds();
-
- execve(args[0], args, exec_env);
- perror("[exile] execvp(): failed");
-
- _exit(FORK_EXEC_FAILURE);
-
- default: // parent
- /* close file descriptors used by child */
- close(r_cmdin);
- close(w_cmdout);
-
- result.success = true;
- result.context.pid = pid;
- result.context.cmd_input_fd = w_cmdin;
- result.context.cmd_output_fd = r_cmdout;
-
- return result;
- }
-}
-
-/* TODO: return appropriate error instead returning generic "badarg" error */
-static ERL_NIF_TERM execute(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ErlNifTime start;
- int stderr_to_console_int;
- ERL_NIF_TERM term;
- char **exec_args, **exec_env;
- char *dir;
- bool stderr_to_console = true;
- struct ExilePriv *data;
- StartProcessResult result;
- ExecContext *ctx = NULL;
-
- exec_args = NULL;
- exec_env = NULL;
- dir = NULL;
-
- start = enif_monotonic_time(ERL_NIF_USEC);
-
- if (argc != 4) {
- error("number of arguments for `execute` must be 4");
- term = enif_make_badarg(env);
- goto exit;
- }
-
- if (erl_list_to_array_of_cstr(env, argv[0], &exec_args) != true) {
- error("failed to read command arguments");
- term = enif_make_badarg(env);
- goto exit;
- }
-
- if (erl_list_to_array_of_cstr(env, argv[1], &exec_env) != true) {
- error("failed to read env list");
- term = enif_make_badarg(env);
- goto exit;
- }
-
- if (read_string(env, argv[2], &dir) != true) {
- error("failed to get `dir`");
- term = enif_make_badarg(env);
- goto exit;
- }
-
- if (enif_get_int(env, argv[3], &stderr_to_console_int) != true) {
- error("failed to read stderr_to_console int");
- term = enif_make_badarg(env);
- goto exit;
- }
- stderr_to_console = stderr_to_console_int == 1 ? true : false;
-
- data = enif_priv_data(env);
- result = start_process(exec_args, stderr_to_console, dir, exec_env);
-
- if (result.success) {
- ctx = enif_alloc_resource(data->exec_ctx_rt, sizeof(ExecContext));
- ctx->cmd_input_fd = result.context.cmd_input_fd;
- ctx->cmd_output_fd = result.context.cmd_output_fd;
- ctx->read_resource = enif_alloc_resource(data->io_rt, sizeof(int));
- ctx->write_resource = enif_alloc_resource(data->io_rt, sizeof(int));
- ctx->pid = result.context.pid;
-
- debug("pid: %d cmd_in_fd: %d cmd_out_fd: %d", ctx->pid, ctx->cmd_input_fd,
- ctx->cmd_output_fd);
-
- term = enif_make_resource(env, ctx);
-
- /* resource should be collected beam GC when there are no more references */
- enif_release_resource(ctx);
-
- notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
-
- term = make_ok(env, term);
- } else {
- term = make_error(env, enif_make_int(env, result.err));
- }
-
-exit:
- free_array_of_cstr(&exec_args);
- free_array_of_cstr(&exec_env);
-
- if (dir)
- free(dir);
-
- return term;
-}
+static int select_write(ErlNifEnv *env, int *fd) {
+ int ret =
+ enif_select(env, *fd, ERL_NIF_SELECT_WRITE, fd, NULL, ATOM_UNDEFINED);
-static int select_write(ErlNifEnv *env, ExecContext *ctx) {
- int retval = enif_select(env, ctx->cmd_input_fd, ERL_NIF_SELECT_WRITE,
- ctx->write_resource, NULL, ATOM_UNDEFINED);
- if (retval != 0)
+ if (ret != 0)
perror("select_write()");
-
- return retval;
+ return ret;
}
-static ERL_NIF_TERM sys_write(ErlNifEnv *env, int argc,
+static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
- if (argc != 2)
- enif_make_badarg(env);
+ assert_argc(argc, 2);
ErlNifTime start;
- start = enif_monotonic_time(ERL_NIF_USEC);
+ ssize_t size;
+ ErlNifBinary bin;
+ int write_errno;
+ int *fd;
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
+ start = enif_monotonic_time(ERL_NIF_USEC);
- if (ctx->cmd_input_fd == PIPE_CLOSED)
- return make_error(env, ATOM_PIPE_CLOSED);
+ if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
+ return make_error(env, ATOM_INVALID_FD);
- ErlNifBinary bin;
if (enif_inspect_binary(env, argv[1], &bin) != true)
return enif_make_badarg(env);
if (bin.size == 0)
return enif_make_badarg(env);
/* should we limit the bin.size here? */
- ssize_t result = write(ctx->cmd_input_fd, bin.data, bin.size);
- int write_errno = errno;
+ size = write(*fd, bin.data, bin.size);
+ write_errno = errno;
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
- /* TODO: branching is ugly, cleanup required */
- if (result >= (ssize_t)bin.size) { // request completely satisfied
- return make_ok(env, enif_make_int(env, result));
- } else if (result >= 0) { // request partially satisfied
- int retval = select_write(env, ctx);
+ if (size >= (ssize_t)bin.size) { // request completely satisfied
+ return make_ok(env, enif_make_int(env, size));
+ } else if (size >= 0) { // request partially satisfied
+ int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
- return make_ok(env, enif_make_int(env, result));
+ return make_ok(env, enif_make_int(env, size));
} else if (write_errno == EAGAIN || write_errno == EWOULDBLOCK) { // busy
- int retval = select_write(env, ctx);
+ int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
- } else { // Error
+ } else {
perror("write()");
return make_error(env, enif_make_int(env, write_errno));
}
}
-static ERL_NIF_TERM sys_close(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
+static int select_read(ErlNifEnv *env, int *fd) {
+ int ret =
+ enif_select(env, *fd, ERL_NIF_SELECT_READ, fd, NULL, ATOM_UNDEFINED);
- int kind;
- enif_get_int(env, argv[1], &kind);
+ if (ret != 0)
+ perror("select_read()");
+ return ret;
+}
- int result;
- switch (kind) {
- case 0:
- if (ctx->cmd_input_fd == PIPE_CLOSED) {
- return ATOM_OK;
- } else {
- enif_select(env, ctx->cmd_input_fd, ERL_NIF_SELECT_STOP,
- ctx->write_resource, NULL, ATOM_UNDEFINED);
- result = close(ctx->cmd_input_fd);
- if (result == 0) {
- ctx->cmd_input_fd = PIPE_CLOSED;
- return ATOM_OK;
- } else {
- perror("cmd_input_fd close()");
- return make_error(env, enif_make_int(env, errno));
- }
- }
- case 1:
- if (ctx->cmd_output_fd == PIPE_CLOSED) {
- return ATOM_OK;
- } else {
- enif_select(env, ctx->cmd_output_fd, ERL_NIF_SELECT_STOP,
- ctx->read_resource, NULL, ATOM_UNDEFINED);
- result = close(ctx->cmd_output_fd);
- if (result == 0) {
- ctx->cmd_output_fd = PIPE_CLOSED;
- return ATOM_OK;
- } else {
- perror("cmd_output_fd close()");
- return make_error(env, enif_make_int(env, errno));
- }
- }
- default:
- debug("invalid file descriptor type");
- return enif_make_badarg(env);
+static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc,
+ const ERL_NIF_TERM argv[]) {
+ assert_argc(argc, 1);
+
+ ERL_NIF_TERM term;
+ ErlNifPid pid;
+ int *fd;
+ int ret;
+
+ fd = enif_alloc_resource(FD_RT, sizeof(int));
+
+ if (!enif_get_int(env, argv[0], fd))
+ goto error_exit;
+
+ if (!enif_self(env, &pid)) {
+ error("failed get self pid");
+ goto error_exit;
}
-}
-static int select_read(ErlNifEnv *env, ExecContext *ctx) {
- int retval = enif_select(env, ctx->cmd_output_fd, ERL_NIF_SELECT_READ,
- ctx->read_resource, NULL, ATOM_UNDEFINED);
- if (retval != 0)
- perror("select_read()");
- return retval;
+ ret = enif_monitor_process(env, fd, &pid, NULL);
+
+ if (ret < 0) {
+ error("no down callback is provided");
+ goto error_exit;
+ } else if (ret > 0) {
+ error("pid is not alive");
+ goto error_exit;
+ }
+
+ term = enif_make_resource(env, fd);
+ enif_release_resource(fd);
+
+ return make_ok(env, term);
+
+error_exit:
+ enif_release_resource(fd);
+ return ATOM_ERROR;
}
-static ERL_NIF_TERM sys_read(ErlNifEnv *env, int argc,
+static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
- if (argc != 2)
- enif_make_badarg(env);
+ assert_argc(argc, 2);
ErlNifTime start;
- start = enif_monotonic_time(ERL_NIF_USEC);
+ int size, demand;
+ int *fd;
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
+ start = enif_monotonic_time(ERL_NIF_USEC);
- if (ctx->cmd_output_fd == PIPE_CLOSED)
- return make_error(env, ATOM_PIPE_CLOSED);
+ if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
+ return make_error(env, ATOM_INVALID_FD);
- int size, request;
+ if (!enif_get_int(env, argv[1], &demand))
+ return enif_make_badarg(env);
- enif_get_int(env, argv[1], &request);
- size = request;
+ size = demand;
- if (request == UNBUFFERED_READ) {
+ if (demand == UNBUFFERED_READ) {
size = PIPE_BUF_SIZE;
- } else if (request < 1) {
- enif_make_badarg(env);
- } else if (request > PIPE_BUF_SIZE) {
+ } else if (demand < 1) {
+ return enif_make_badarg(env);
+ } else if (demand > PIPE_BUF_SIZE) {
size = PIPE_BUF_SIZE;
}
unsigned char buf[size];
- ssize_t result = read(ctx->cmd_output_fd, buf, size);
+ ssize_t result = read(*fd, buf, size);
int read_errno = errno;
ERL_NIF_TERM bin_term = 0;
if (result >= 0) {
/* no need to release this binary */
unsigned char *temp = enif_make_new_binary(env, result, &bin_term);
memcpy(temp, buf, result);
}
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (result >= 0) {
- /* we do not 'select' if request completely satisfied OR EOF OR its
+ /* we do not 'select' if demand completely satisfied OR EOF OR its
* UNBUFFERED_READ */
- if (result == request || result == 0 || request == UNBUFFERED_READ) {
+ if (result == demand || result == 0 || demand == UNBUFFERED_READ) {
return make_ok(env, bin_term);
- } else { // request partially satisfied
- int retval = select_read(env, ctx);
+ } else { // demand partially satisfied
+ int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, bin_term);
}
} else {
if (read_errno == EAGAIN || read_errno == EWOULDBLOCK) { // busy
- int retval = select_read(env, ctx);
+ int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
- } else { // Error
+ } else {
perror("read()");
return make_error(env, enif_make_int(env, read_errno));
}
}
}
-static ERL_NIF_TERM is_alive(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
+static ERL_NIF_TERM nif_close(ErlNifEnv *env, int argc,
+ const ERL_NIF_TERM argv[]) {
+ assert_argc(argc, 1);
- if (ctx->pid == CMD_EXIT)
- return make_ok(env, ATOM_TRUE);
+ int *fd;
- int result = kill(ctx->pid, 0);
+ if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
+ return make_error(env, ATOM_INVALID_FD);
- if (result == 0) {
- return make_ok(env, ATOM_TRUE);
- } else {
- return make_ok(env, ATOM_FALSE);
- }
-}
+ if (cancel_select(env, fd) < 0)
+ return make_error(env, ATOM_SELECT_CANCEL_ERROR);
-static ERL_NIF_TERM sys_terminate(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
- if (ctx->pid == CMD_EXIT)
- return make_ok(env, enif_make_int(env, 0));
+ close_fd(fd);
- return make_ok(env, enif_make_int(env, kill(ctx->pid, SIGTERM)));
+ return ATOM_OK;
}
-static ERL_NIF_TERM sys_kill(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
- if (ctx->pid == CMD_EXIT)
- return make_ok(env, enif_make_int(env, 0));
+static ERL_NIF_TERM nif_is_os_pid_alive(ErlNifEnv *env, int argc,
+ const ERL_NIF_TERM argv[]) {
+ assert_argc(argc, 1);
- return make_ok(env, enif_make_int(env, kill(ctx->pid, SIGKILL)));
-}
+ pid_t pid;
-static ERL_NIF_TERM make_exit_term(ErlNifEnv *env, ExecContext *ctx) {
- switch (ctx->exit_type) {
- case NORMAL_EXIT:
- return make_ok(env, enif_make_tuple2(env, ATOM_EXIT,
- enif_make_int(env, ctx->exit_status)));
- case SIGNALED:
- /* exit_status here points to signal number */
- return make_ok(env, enif_make_tuple2(env, ATOM_SIGNALED,
- enif_make_int(env, ctx->exit_status)));
- case STOPPED:
- return make_ok(env, enif_make_tuple2(env, ATOM_STOPPED,
- enif_make_int(env, ctx->exit_status)));
- default:
- error("Invalid wait status");
- return make_error(env, ATOM_UNDEFINED);
- }
+ if (!enif_get_int(env, argv[0], (int *)&pid))
+ return enif_make_badarg(env);
+
+ int result = kill(pid, 0);
+
+ if (result == 0)
+ return ATOM_TRUE;
+ else
+ return ATOM_FALSE;
}
-static ERL_NIF_TERM sys_wait(ErlNifEnv *env, int argc,
+static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
-
- if (ctx->pid == CMD_EXIT)
- return make_exit_term(env, ctx);
-
- int status = 0;
- int wpid = waitpid(ctx->pid, &status, WNOHANG);
-
- if (wpid == ctx->pid) {
- ctx->pid = CMD_EXIT;
-
- if (WIFEXITED(status)) {
- ctx->exit_type = NORMAL_EXIT;
- ctx->exit_status = WEXITSTATUS(status);
- } else if (WIFSIGNALED(status)) {
- ctx->exit_type = SIGNALED;
- ctx->exit_status = WTERMSIG(status);
- } else if (WIFSTOPPED(status)) {
- ctx->exit_type = STOPPED;
- ctx->exit_status = 0;
- }
+ assert_argc(argc, 2);
- return make_exit_term(env, ctx);
- } else if (wpid != 0) {
- perror("waitpid()");
- }
- ERL_NIF_TERM term = enif_make_tuple2(env, enif_make_int(env, wpid),
- enif_make_int(env, status));
- return make_error(env, term);
-}
+ pid_t pid;
+ int ret;
+
+ // we should not assume pid type to be `int`?
+ if (!enif_get_int(env, argv[0], (int *)&pid))
+ return enif_make_badarg(env);
-static ERL_NIF_TERM os_pid(ErlNifEnv *env, int argc,
- const ERL_NIF_TERM argv[]) {
- ExecContext *ctx = NULL;
- GET_CTX(env, argv[0], ctx);
- if (ctx->pid == CMD_EXIT)
- return make_ok(env, enif_make_int(env, 0));
+ if (enif_compare(argv[1], ATOM_SIGKILL) == 0)
+ ret = kill(pid, SIGKILL);
+ else if (enif_compare(argv[1], ATOM_SIGTERM) == 0)
+ ret = kill(pid, SIGTERM);
+ else
+ return enif_make_badarg(env);
+
+ if (ret != 0) {
+ perror("[exile] failed to send signal");
+ return make_error(
+ env, enif_make_string(env, "failed to send signal", ERL_NIF_LATIN1));
+ }
- return make_ok(env, enif_make_int(env, ctx->pid));
+ return ATOM_OK;
}
static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
- struct ExilePriv *data = enif_alloc(sizeof(struct ExilePriv));
- if (!data)
- return 1;
-
- data->exec_ctx_rt =
- enif_open_resource_type_x(env, "exile_resource", &exec_ctx_rt_init,
- ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
- data->io_rt =
+ FD_RT =
enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_FALSE = enif_make_atom(env, "false");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
- ATOM_INVALID_CTX = enif_make_atom(env, "invalid_exile_exec_ctx");
- ATOM_PIPE_CLOSED = enif_make_atom(env, "closed_pipe");
- ATOM_EXIT = enif_make_atom(env, "exit");
- ATOM_SIGNALED = enif_make_atom(env, "signaled");
- ATOM_STOPPED = enif_make_atom(env, "stopped");
+ ATOM_INVALID_FD = enif_make_atom(env, "invalid_fd_resource");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
- ATOM_ALLOC_FAILED = enif_make_atom(env, "alloc_failed");
+ ATOM_SELECT_CANCEL_ERROR = enif_make_atom(env, "select_cancel_error");
- *priv = (void *)data;
+ ATOM_SIGTERM = enif_make_atom(env, "sigterm");
+ ATOM_SIGKILL = enif_make_atom(env, "sigkill");
return 0;
}
static void on_unload(ErlNifEnv *env, void *priv) {
debug("exile unload");
enif_free(priv);
}
static ErlNifFunc nif_funcs[] = {
- {"execute", 4, execute, USE_DIRTY_IO},
- {"sys_write", 2, sys_write, USE_DIRTY_IO},
- {"sys_read", 2, sys_read, USE_DIRTY_IO},
- {"sys_close", 2, sys_close, USE_DIRTY_IO},
- {"sys_terminate", 1, sys_terminate, USE_DIRTY_IO},
- {"sys_wait", 1, sys_wait, USE_DIRTY_IO},
- {"sys_kill", 1, sys_kill, USE_DIRTY_IO},
- {"alive?", 1, is_alive, USE_DIRTY_IO},
- {"os_pid", 1, os_pid, USE_DIRTY_IO},
-};
+ {"nif_read", 2, nif_read, USE_DIRTY_IO},
+ {"nif_create_fd", 1, nif_create_fd, USE_DIRTY_IO},
+ {"nif_write", 2, nif_write, USE_DIRTY_IO},
+ {"nif_close", 1, nif_close, USE_DIRTY_IO},
+ {"nif_is_os_pid_alive", 1, nif_is_os_pid_alive, USE_DIRTY_IO},
+ {"nif_kill", 2, nif_kill, USE_DIRTY_IO}};
ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
diff --git a/c_src/spawner.c b/c_src/spawner.c
new file mode 100644
index 0000000..23e6700
--- /dev/null
+++ b/c_src/spawner.c
@@ -0,0 +1,224 @@
+#ifndef _POSIX_C_SOURCE
+#define _POSIX_C_SOURCE 200809L
+#endif
+
+#include <fcntl.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+// these definitions are Linux only at the moment
+#ifndef CMSG_LEN
+socklen_t CMSG_LEN(size_t len) {
+ return (CMSG_DATA((struct cmsghdr *)NULL) - (unsigned char *)NULL) + len;
+}
+#endif
+
+#ifndef CMSG_SPACE
+socklen_t CMSG_SPACE(size_t len) {
+ struct msghdr msg;
+ struct cmsghdr cmsg;
+ msg.msg_control = &cmsg;
+ msg.msg_controllen =
+ ~0; /* To maximize the chance that CMSG_NXTHDR won't return NULL */
+ cmsg.cmsg_len = CMSG_LEN(len);
+ return (unsigned char *)CMSG_NXTHDR(&msg, &cmsg) - (unsigned char *)&cmsg;
+}
+#endif
+
+// #define DEBUG
+
+#ifdef DEBUG
+#define debug(...) \
+ do { \
+ fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, __func__); \
+ fprintf(stderr, __VA_ARGS__); \
+ fprintf(stderr, "\n"); \
+ } while (0)
+#else
+#define debug(...)
+#endif
+
+#define error(...) \
+ do { \
+ fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, __func__); \
+ fprintf(stderr, __VA_ARGS__); \
+ fprintf(stderr, "\n"); \
+ } while (0)
+
+static const int PIPE_READ = 0;
+static const int PIPE_WRITE = 1;
+
+/* We are choosing an exit code which is not reserved see:
+ * https://www.tldp.org/LDP/abs/html/exitcodes.html. */
+static const int FORK_EXEC_FAILURE = 125;
+
+static int set_flag(int fd, int flags) {
+ return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
+}
+
+static int send_io_fds(int socket, int read_fd, int write_fd) {
+ struct msghdr msg = {0};
+ struct cmsghdr *cmsg;
+ int fds[2];
+ char buf[CMSG_SPACE(2 * sizeof(int))], dup[256];
+ struct iovec io;
+
+ memset(buf, '\0', sizeof(buf));
+
+ io.iov_base = &dup;
+ io.iov_len = sizeof(dup);
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(2 * sizeof(int));
+
+ fds[0] = read_fd;
+ fds[1] = write_fd;
+
+ memcpy((int *)CMSG_DATA(cmsg), fds, 2 * sizeof(int));
+
+ if (sendmsg(socket, &msg, 0) < 0) {
+ debug("Failed to send message");
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+}
+
+static void close_pipes(int pipes[2][2]) {
+ for (int i = 0; i < 2; i++) {
+ if (pipes[i][PIPE_READ] > 0)
+ close(pipes[i][PIPE_READ]);
+ if (pipes[i][PIPE_WRITE] > 0)
+ close(pipes[i][PIPE_WRITE]);
+ }
+}
+
+static int exec_process(char const *bin, char *const *args, int socket) {
+ int pipes[2][2] = {{0, 0}, {0, 0}};
+ int r_cmdin, w_cmdin, r_cmdout, w_cmdout;
+ int i;
+
+ if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1) {
+ perror("[spawner] failed to create pipes");
+ close_pipes(pipes);
+ return 1;
+ }
+
+ debug("created pipes");
+
+ r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
+ w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
+
+ r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
+ w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
+
+ if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
+ set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
+ set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0) {
+ perror("[spawner] failed to set flags for pipes");
+ close_pipes(pipes);
+ return 1;
+ }
+
+ debug("set fd flags to pipes");
+
+ if (send_io_fds(socket, w_cmdin, r_cmdout) != EXIT_SUCCESS) {
+ perror("[spawner] failed to send fd via socket");
+ close_pipes(pipes);
+ return 1;
+ }
+
+ debug("sent fds over UDS");
+
+ close(STDIN_FILENO);
+ close(w_cmdin);
+ if (dup2(r_cmdin, STDIN_FILENO) < 0) {
+ perror("[spawner] failed to dup to stdin");
+ _exit(FORK_EXEC_FAILURE);
+ }
+
+ close(STDOUT_FILENO);
+ close(r_cmdout);
+ if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
+ perror("[spawner] failed to dup to stdout");
+ _exit(FORK_EXEC_FAILURE);
+ }
+
+ /* Close all non-standard io fds. Not closing STDERR */
+ for (i = STDERR_FILENO + 1; i < sysconf(_SC_OPEN_MAX); i++)
+ close(i);
+
+ debug("exec %s", bin);
+
+ execvp(bin, args);
+ perror("[spawner] execvp(): failed");
+
+ _exit(FORK_EXEC_FAILURE);
+}
+
+static int spawn(const char *socket_path, const char *bin, char *const *args) {
+ int socket_fd;
+ struct sockaddr_un socket_addr;
+
+ socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+
+ if (socket_fd == -1) {
+ debug("Failed to create socket");
+ return EXIT_FAILURE;
+ }
+
+ debug("created domain socket");
+
+ memset(&socket_addr, 0, sizeof(struct sockaddr_un));
+ socket_addr.sun_family = AF_UNIX;
+ strncpy(socket_addr.sun_path, socket_path, sizeof(socket_addr.sun_path) - 1);
+
+ if (connect(socket_fd, (struct sockaddr *)&socket_addr,
+ sizeof(struct sockaddr_un)) == -1) {
+ debug("Failed to connect to socket");
+ return EXIT_FAILURE;
+ }
+
+ debug("connected to exile");
+
+ if (exec_process(bin, args, socket_fd) != 0)
+ return EXIT_FAILURE;
+
+ // we should never reach here
+ return EXIT_SUCCESS;
+}
+
+int main(int argc, const char *argv[]) {
+ int status, i;
+ const char **exec_argv;
+
+ if (argc < 3) {
+ debug("expected at least 2 arguments, passed %d", argc);
+ status = EXIT_FAILURE;
+ } else {
+ exec_argv = malloc((argc - 2 + 1) * sizeof(char *));
+
+ for (i = 2; i < argc; i++)
+ exec_argv[i - 2] = argv[i];
+
+ exec_argv[i - 2] = NULL;
+
+ debug("socket path: %s bin: %s", argv[1], argv[2]);
+ status = spawn(argv[1], argv[2], (char *const *)exec_argv);
+ }
+
+ exit(status);
+}
diff --git a/lib/exile.ex b/lib/exile.ex
index 7b1198f..c86ee36 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,57 +1,73 @@
defmodule Exile do
@moduledoc """
Exile is an alternative for beam ports with back-pressure and non-blocking IO
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
# we use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
@doc """
Runs the given command with arguments and return an Enumerable to read command output.
First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
- * `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
- 1. input as Enumerable:
- ```elixir
- # List
- Exile.stream!(~w(bc -q), input: ["1+1\n", "2*2\n"]) |> Enum.to_list()
-
- # Stream
- Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
- ```
- 2. input as collectable:
- If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
- ```elixir
- Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
- |> Enum.to_list()
- ```
- By defaults no input will be given to the command
- * `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
- * `chunk_size` - Size of each iodata chunk emitted by Enumerable stream. When set to `:unbuffered` the output is unbuffered and chunk size will be variable depending on the amount of data availble at that time. Defaults to 65535
+
+ * `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
+
+ * Enumerable:
+
+ ```
+ # List
+ Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
+ # Stream
+ Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
+ ```
+
+ * Collectable:
+
+ If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
+
+ ```
+ Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
+ |> Enum.to_list()
+ ```
+
+ By defaults no input will be given to the command
+
+ * `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
+
+ * `chunk_size` - Size of each iodata chunk emitted by Enumerable stream. When set to `:unbuffered` the output is unbuffered and chunk size will be variable depending on the amount of data availble at that time. Defaults to 65535
+
All other options are passed to `Exile.Process.start_link/3`
### Examples
- ``` elixir
+ ```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
"""
+ @type collectable_func() :: (Collectable.t() -> any())
+
+ @spec stream!(nonempty_list(String.t()),
+ input: Enum.t() | collectable_func(),
+ exit_timeout: timeout(),
+ chunk_size: pos_integer() | :unbuffered
+ ) :: ExCmd.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index bbc30a6..ee338b3 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,397 +1,510 @@
defmodule Exile.Process do
@moduledoc """
GenServer which wraps spawned external command.
- One should use `Exile.stream!` over `Exile.Process`. stream internally manages this server for you. Use this only if you need more control over the life-cycle OS process.
+ `Exile.stream!/1` should be preferred over this. Use this only if you need more control over the life-cycle of IO streams and OS process.
- ## Overview
- `Exile.Process` is an alternative primitive for Port. It has different interface and approach to running external programs to solve the issues associated with the ports.
+ ## Comparison with Port
- ### When compared to Port
- * it is demand driven. User explicitly has to `read` output of the command and the progress of the external command is controlled using OS pipes. so unlike Port, this never cause memory issues in beam by loading more than we can consume
- * it can close stdin of the program explicitly
- * does not create zombie process. It always tries to cleanup resources
+ * it is demand driven. User explicitly has to `read` the command output, and the progress of the external command is controlled using OS pipes. Exile never load more output than we can consume, so we should never experience memory issues
+ * it can close stdin while consuming stdout
+ * tries to handle zombie process by attempting to cleanup external process. Note that there is no middleware involved with exile so it is still possbile to endup with zombie process.
- At high level it makes non-blocking asynchronous system calls to execute and interact with the external program. It completely bypasses beam implementation for the same using NIF. It uses `select()` system call for asynchronous IO. Most of the system calls are non-blocking, so it does not has adverse effect on scheduler. Issues such as "scheduler collapse".
+ Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication, instead uses raw stdio and NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Make use of dirty-schedulers for IO
"""
- alias Exile.ProcessNif
- require Exile.ProcessNif
+ alias Exile.ProcessNif, as: Nif
require Logger
use GenServer
- # delay between exit_check when io is busy (in milliseconds)
- @exit_check_timeout 5
+ defmodule Error do
+ defexception [:message]
+ end
+
+ alias Exile.Process.Error
- @default_opts [stderr_to_console: false, env: []]
+ @default_opts [env: []]
@doc """
Starts `Exile.ProcessServer`
Starts external program using `cmd_with_args` with options `opts`
`cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
- * `cd` - the directory to run the command in
- * `env` - an enumerable of tuples containing environment key-value. These can be accessed in the external program
- * `stderr_to_console` - whether to print stderr output to console. Defaults to `false`
+ * `cd` - the directory to run the command in
+ * `env` - a list of tuples containing environment key-value. These can be accessed in the external program
"""
+ @type process :: pid
+ @spec start_link(nonempty_list(String.t()),
+ cd: String.t(),
+ env: [{String.t(), String.t()}]
+ ) :: {:ok, process} | {:error, any()}
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
with {:ok, args} <- normalize_args(cmd_with_args, opts) do
GenServer.start(__MODULE__, args)
end
end
+ @doc """
+ Closes external program's input stream
+ """
+ @spec close_stdin(process) :: :ok | {:error, any()}
def close_stdin(process) do
GenServer.call(process, :close_stdin, :infinity)
end
+ @doc """
+ Writes iodata `data` to program's input streams
+
+ This blocks when the pipe is full
+ """
+ @spec write(process, binary) :: :ok | {:error, any()}
def write(process, iodata) do
GenServer.call(process, {:write, IO.iodata_to_binary(iodata)}, :infinity)
end
+ @doc """
+ Return bytes written by the program to output stream.
+
+ This blocks until the programs write and flush the output depending on the `size`
+ """
+ @spec read(process, pos_integer()) ::
+ {:ok, iodata} | {:eof, iodata} | {:error, any()}
def read(process, size) when (is_integer(size) and size > 0) or size == :unbuffered do
GenServer.call(process, {:read, size}, :infinity)
end
def read(process) do
GenServer.call(process, {:read, :unbuffered}, :infinity)
end
+ @doc """
+ Sends signal to external program
+ """
+ @spec kill(process, :sigkill | :sigterm) :: :ok
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process, {:kill, signal}, :infinity)
end
+ @doc """
+ Waits for the program to terminate.
+
+ If the program terminates before timeout, it returns `{:ok, exit_status}` else returns `:timeout`
+ """
+ @spec await_exit(process, timeout: timeout()) :: {:ok, integer()} | :timeout
def await_exit(process, timeout \\ :infinity) do
GenServer.call(process, {:await_exit, timeout}, :infinity)
end
+ @doc """
+ Returns os pid of the command
+ """
+ @spec os_pid(process) :: pos_integer()
def os_pid(process) do
GenServer.call(process, :os_pid, :infinity)
end
+ @doc """
+ Stops the exile process, external program will be terminated in the background
+ """
+ @spec stop(process) :: :ok
def stop(process), do: GenServer.call(process, :stop, :infinity)
## Server
defmodule Pending do
+ @moduledoc false
defstruct bin: [], remaining: 0, client_pid: nil
end
defstruct [
:args,
:errno,
+ :port,
+ :socket_path,
+ :stdin,
+ :stdout,
:context,
:status,
await: %{},
pending_read: nil,
pending_write: nil
]
alias __MODULE__
def init(args) do
state = %__MODULE__{
args: args,
errno: nil,
status: :init,
await: %{},
pending_read: %Pending{},
pending_write: %Pending{}
}
{:ok, state, {:continue, nil}}
end
def handle_continue(nil, state) do
- %{cmd_with_args: cmd_with_args, cd: cd, env: env, stderr_to_console: stderr_to_console} =
- state.args
-
- case ProcessNif.execute(cmd_with_args, env, cd, stderr_to_console) do
- {:ok, context} ->
- {:ok, _} = Exile.Watcher.watch(self(), context)
- {:noreply, %Process{state | context: context, status: :start}}
-
- {:error, errno} ->
- raise "Failed to start command: #{cmd_with_args}, errno: #{errno}"
- end
+ Elixir.Process.flag(:trap_exit, true)
+ {:noreply, start_process(state)}
end
def handle_call(:stop, _from, state) do
- # watcher will take care of termination of external process
-
# TODO: pending write and read should receive "stopped" return
# value instead of exit signal
+ case state.status do
+ {:exit, _} -> :ok
+ _ -> Port.close(state.port)
+ end
+
{:stop, :normal, :ok, state}
end
- def handle_call(_, _from, %{status: {:exit, status}}), do: {:reply, {:error, {:exit, status}}}
+ def handle_call(:close_stdin, _from, state) do
+ case state.status do
+ {:exit, _} -> {:reply, :ok, state}
+ _ -> do_close(state, :stdin)
+ end
+ end
+
+ def handle_call({:await_exit, _}, _from, %{status: {:exit, status}} = state) do
+ {:reply, {:ok, {:exit, status}}, state}
+ end
- def handle_call({:await_exit, timeout}, from, state) do
+ def handle_call({:await_exit, timeout}, from, %{status: :start} = state) do
tref =
if timeout != :infinity do
Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
else
nil
end
- state = put_timer(state, from, :timeout, tref)
- check_exit(state, from)
+ {:noreply, %Process{state | await: Map.put(state.await, from, tref)}}
end
- def handle_call({:write, binary}, from, state) when is_binary(binary) do
- pending = %Pending{bin: binary, client_pid: from}
- do_write(%Process{state | pending_write: pending})
+ def handle_call({:read, size}, from, state) do
+ if state.pending_read.client_pid do
+ {:reply, {:error, :pending_read}, state}
+ else
+ pending = %Pending{remaining: size, client_pid: from}
+ do_read(%Process{state | pending_read: pending})
+ end
end
- def handle_call({:read, size}, from, state) do
- pending = %Pending{remaining: size, client_pid: from}
- do_read(%Process{state | pending_read: pending})
+ def handle_call(_, _from, %{status: {:exit, status}} = state) do
+ {:reply, {:error, {:exit, status}}, state}
end
- def handle_call(:close_stdin, _from, state), do: do_close(state, :stdin)
+ def handle_call({:write, binary}, from, state) do
+ cond do
+ !is_binary(binary) ->
+ {:reply, {:error, :not_binary}, state}
- def handle_call(:os_pid, _from, state), do: {:reply, ProcessNif.os_pid(state.context), state}
+ state.pending_write.client_pid ->
+ {:reply, {:error, :pending_write}, state}
- def handle_call({:kill, signal}, _from, state) do
- do_kill(state.context, signal)
- {:reply, :ok, %{state | status: {:exit, :killed}}}
+ true ->
+ pending = %Pending{bin: binary, client_pid: from}
+ do_write(%Process{state | pending_write: pending})
+ end
end
- def handle_info({:check_exit, from}, state), do: check_exit(state, from)
-
- def handle_info({:await_exit_timeout, from}, state) do
- cancel_timer(state, from, :check)
+ def handle_call(:os_pid, _from, state) do
+ case Port.info(state.port, :os_pid) do
+ {:os_pid, os_pid} ->
+ {:reply, {:ok, os_pid}, state}
- receive do
- {:check_exit, ^from} -> :ok
- after
- 0 -> :ok
+ :undefined ->
+ Logger.debug("Process not alive")
+ {:reply, :undefined, state}
end
+ end
+ def handle_call({:kill, signal}, _from, state) do
+ {:reply, signal(state.port, signal), state}
+ end
+
+ def handle_info({:await_exit_timeout, from}, state) do
GenServer.reply(from, :timeout)
- {:noreply, clear_await(state, from)}
+ {:noreply, %Process{state | await: Map.delete(state.await, from)}}
end
def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
def handle_info({:select, _read_resource, _ref, :ready_input}, state), do: do_read(state)
- def handle_info(msg, _state), do: raise(msg)
+ def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state),
+ do: handle_port_exit(exit_status, state)
+
+ def handle_info({:EXIT, port, :normal}, %{port: port} = state), do: {:noreply, state}
+
+ def handle_info({:EXIT, _, reason}, state), do: {:stop, reason, state}
+
+ defp handle_port_exit(exit_status, state) do
+ Enum.each(state.await, fn {from, tref} ->
+ GenServer.reply(from, {:ok, {:exit, exit_status}})
+
+ if tref do
+ Elixir.Process.cancel_timer(tref)
+ end
+ end)
+
+ {:noreply, %Process{state | status: {:exit, exit_status}}, await: %{}}
+ end
defp do_write(%Process{pending_write: %Pending{bin: <<>>}} = state) do
GenServer.reply(state.pending_write.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
defp do_write(%Process{pending_write: pending} = state) do
- case ProcessNif.sys_write(state.context, pending.bin) do
+ case Nif.nif_write(state.stdin, pending.bin) do
{:ok, size} ->
if size < byte_size(pending.bin) do
binary = binary_part(pending.bin, size, byte_size(pending.bin) - size)
{:noreply, %{state | pending_write: %Pending{pending | bin: binary}}}
else
GenServer.reply(pending.client_pid, :ok)
{:noreply, %{state | pending_write: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | errno: errno}}
end
end
defp do_read(%Process{pending_read: %Pending{remaining: :unbuffered} = pending} = state) do
- case ProcessNif.sys_read(state.context, -1) do
+ case Nif.nif_read(state.stdout, -1) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, []})
- {:noreply, state}
+ {:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
GenServer.reply(pending.client_pid, {:ok, binary})
- {:noreply, state}
+ {:noreply, %Process{state | pending_read: %Pending{}}}
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
- {:noreply, %{state | errno: errno}}
+ {:noreply, %Process{state | pending_read: %Pending{}, errno: errno}}
end
end
defp do_read(%Process{pending_read: pending} = state) do
- case ProcessNif.sys_read(state.context, pending.remaining) do
+ case Nif.nif_read(state.stdout, pending.remaining) do
{:ok, <<>>} ->
GenServer.reply(pending.client_pid, {:eof, pending.bin})
{:noreply, %Process{state | pending_read: %Pending{}}}
{:ok, binary} ->
if byte_size(binary) < pending.remaining do
pending = %Pending{
pending
| bin: [pending.bin | binary],
remaining: pending.remaining - byte_size(binary)
}
{:noreply, %Process{state | pending_read: pending}}
else
GenServer.reply(pending.client_pid, {:ok, [state.pending_read.bin | binary]})
{:noreply, %Process{state | pending_read: %Pending{}}}
end
{:error, :eagain} ->
{:noreply, state}
{:error, errno} ->
GenServer.reply(pending.client_pid, {:error, errno})
{:noreply, %{state | pending_read: %Pending{}, errno: errno}}
end
end
- defp check_exit(state, from) do
- case ProcessNif.sys_wait(state.context) do
- {:ok, {:exit, ProcessNif.fork_exec_failure()}} ->
- GenServer.reply(from, {:error, :failed_to_execute})
- cancel_timer(state, from, :timeout)
- {:noreply, clear_await(state, from)}
-
- {:ok, status} ->
- GenServer.reply(from, {:ok, status})
- cancel_timer(state, from, :timeout)
- {:noreply, clear_await(state, from)}
-
- {:error, {0, _}} ->
- # Ideally we should not poll and we should handle this with SIGCHLD signal
- tref = Elixir.Process.send_after(self(), {:check_exit, from}, @exit_check_timeout)
- {:noreply, put_timer(state, from, :check, tref)}
-
- {:error, {-1, _}} ->
- GenServer.reply(from, :error)
- cancel_timer(state, from, :timeout)
- {:noreply, clear_await(state, from)}
- end
- end
-
- defp do_kill(context, :sigkill), do: ProcessNif.sys_kill(context)
-
- defp do_kill(context, :sigterm), do: ProcessNif.sys_terminate(context)
-
defp do_close(state, type) do
- case ProcessNif.sys_close(state.context, ProcessNif.to_process_fd(type)) do
+ fd =
+ if type == :stdin do
+ state.stdin
+ else
+ state.stdout
+ end
+
+ case Nif.nif_close(fd) do
:ok ->
{:reply, :ok, state}
{:error, errno} ->
+ # FIXME: correct
raise errno
{:reply, {:error, errno}, %Process{state | errno: errno}}
end
end
- defp clear_await(state, from) do
- %Process{state | await: Map.delete(state.await, from)}
- end
-
- defp cancel_timer(state, from, key) do
- case get_timer(state, from, key) do
- nil -> :ok
- tref -> Elixir.Process.cancel_timer(tref)
- end
- end
-
- defp put_timer(state, from, key, timer) do
- if Map.has_key?(state.await, from) do
- await = put_in(state.await, [from, key], timer)
- %Process{state | await: await}
- else
- %Process{state | await: %{from => %{key => timer}}}
- end
- end
-
- defp get_timer(state, from, key), do: get_in(state.await, [from, key])
-
defp normalize_cmd([cmd | _]) when is_binary(cmd) do
path = System.find_executable(cmd)
if path do
{:ok, to_charlist(path)}
else
{:error, "command not found: #{inspect(cmd)}"}
end
end
defp normalize_cmd(_cmd_with_args) do
{:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
end
defp normalize_cmd_args([_ | args]) do
if is_list(args) && Enum.all?(args, &is_binary/1) do
{:ok, Enum.map(args, &to_charlist/1)}
else
{:error, "command arguments must be list of strings. #{inspect(args)}"}
end
end
defp normalize_cd(nil), do: {:ok, ''}
defp normalize_cd(cd) do
if File.exists?(cd) && File.dir?(cd) do
{:ok, to_charlist(cd)}
else
{:error, "`:cd` must be valid directory path"}
end
end
defp normalize_env(nil), do: {:ok, []}
defp normalize_env(env) do
- user_env =
- Map.new(env, fn {key, value} ->
- {String.trim(key), String.trim(value)}
- end)
-
- # spawned process env will be beam env at that time + user env.
- # this is similar to erlang behavior
- env_list =
- Map.merge(System.get_env(), user_env)
- |> Enum.map(fn {k, v} ->
- to_charlist(k <> "=" <> v)
+ env =
+ Enum.map(env, fn {key, value} ->
+ {to_charlist(key), to_charlist(value)}
end)
- {:ok, env_list}
- end
-
- defp normalize_stderr_to_console(nil), do: {:ok, ProcessNif.nif_false()}
-
- defp normalize_stderr_to_console(term) do
- if term, do: {:ok, ProcessNif.nif_true()}, else: {:ok, ProcessNif.nif_false()}
+ {:ok, env}
end
defp validate_opts_fields(opts) do
- {_, additional_opts} = Keyword.split(opts, [:cd, :stderr_to_console, :env])
+ {_, additional_opts} = Keyword.split(opts, [:cd, :env])
if Enum.empty?(additional_opts) do
:ok
else
{:error, "invalid opts: #{inspect(additional_opts)}"}
end
end
defp normalize_args(cmd_with_args, opts) do
with {:ok, cmd} <- normalize_cmd(cmd_with_args),
{:ok, args} <- normalize_cmd_args(cmd_with_args),
:ok <- validate_opts_fields(opts),
{:ok, cd} <- normalize_cd(opts[:cd]),
- {:ok, stderr_to_console} <- normalize_stderr_to_console(opts[:stderr_to_console]),
{:ok, env} <- normalize_env(opts[:env]) do
- {:ok,
- %{cmd_with_args: [cmd | args], cd: cd, stderr_to_console: stderr_to_console, env: env}}
+ {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env}}
+ end
+ end
+
+ @spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
+
+ defp exec(cmd_with_args, socket_path, env, cd) do
+ opts = []
+ opts = if cd, do: [{:cd, cd} | opts], else: []
+ opts = if env, do: [{:env, env} | opts], else: opts
+
+ opts =
+ [
+ :nouse_stdio,
+ :exit_status,
+ :binary,
+ args: [socket_path | cmd_with_args]
+ ] ++ opts
+
+ Port.open({:spawn_executable, @spawner_path}, opts)
+ end
+
+ defp socket_path do
+ str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
+ path = Path.join(System.tmp_dir!(), str)
+ _ = :file.delete(path)
+ path
+ end
+
+ defp signal(port, sig) when sig in [:sigkill, :sigterm] do
+ case Port.info(port, :os_pid) do
+ {:os_pid, os_pid} -> Nif.nif_kill(os_pid, sig)
+ :undefined -> {:error, :process_not_alive}
+ end
+ end
+
+ defp start_process(%{args: %{cmd_with_args: cmd_with_args, cd: cd, env: env}} = state) do
+ path = socket_path()
+ {:ok, sock} = :socket.open(:local, :stream, :default)
+
+ try do
+ {:ok, _} = :socket.bind(sock, %{family: :local, path: path})
+ :ok = :socket.listen(sock)
+
+ port = exec(cmd_with_args, path, env, cd)
+ {:os_pid, os_pid} = Port.info(port, :os_pid)
+ Exile.Watcher.watch(self(), os_pid, path)
+
+ {stdout, stdin} = receive_fds(sock)
+
+ %Process{
+ state
+ | port: port,
+ status: :start,
+ socket_path: path,
+ stdin: stdin,
+ stdout: stdout
+ }
+ after
+ :socket.close(sock)
+ end
+ end
+
+ @socket_timeout 2000
+ defp receive_fds(lsock) do
+ {:ok, sock} = :socket.accept(lsock, @socket_timeout)
+
+ try do
+ case :socket.recvmsg(sock, @socket_timeout) do
+ {:ok, msg} ->
+ %{
+ ctrl: [
+ %{
+ data: <<stdin_fd_int::native-32, stdout_fd_int::native-32, _::binary>>,
+ level: :socket,
+ type: :rights
+ }
+ ]
+ } = msg
+
+ with {:ok, stdout} <- Nif.nif_create_fd(stdout_fd_int),
+ {:ok, stdin} <- Nif.nif_create_fd(stdin_fd_int) do
+ {stdout, stdin}
+ else
+ error ->
+ raise Error,
+ message: "Failed to create fd resources\n error: #{inspect(error)}"
+ end
+
+ {:error, reason} ->
+ raise Error,
+ message:
+ "Failed to receive stdin and stdout file descriptors\n error: #{inspect(reason)}"
+ end
+ after
+ :socket.close(sock)
end
end
end
diff --git a/lib/exile/process_nif.ex b/lib/exile/process_nif.ex
index aa10f57..7607274 100644
--- a/lib/exile/process_nif.ex
+++ b/lib/exile/process_nif.ex
@@ -1,37 +1,21 @@
defmodule Exile.ProcessNif do
@moduledoc false
@on_load :load_nifs
def load_nifs do
nif_path = :filename.join(:code.priv_dir(:exile), "exile")
:erlang.load_nif(nif_path, 0)
end
- def execute(_cmd, _dir, _env, _stderr_to_console),
- do: :erlang.nif_error(:nif_library_not_loaded)
+ def nif_is_os_pid_alive(_os_pid), do: :erlang.nif_error(:nif_library_not_loaded)
- def sys_write(_context, _bin), do: :erlang.nif_error(:nif_library_not_loaded)
+ def nif_kill(_os_pid, _signal), do: :erlang.nif_error(:nif_library_not_loaded)
- def sys_read(_context, _bytes), do: :erlang.nif_error(:nif_library_not_loaded)
+ def nif_read(_fd, _request), do: :erlang.nif_error(:nif_library_not_loaded)
- def sys_close(_context, _pipe), do: :erlang.nif_error(:nif_library_not_loaded)
+ def nif_create_fd(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
- def sys_kill(_context), do: :erlang.nif_error(:nif_library_not_loaded)
+ def nif_close(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
- def sys_terminate(_context), do: :erlang.nif_error(:nif_library_not_loaded)
-
- def sys_wait(_context), do: :erlang.nif_error(:nif_library_not_loaded)
-
- def os_pid(_context), do: :erlang.nif_error(:nif_library_not_loaded)
-
- def alive?(_context), do: :erlang.nif_error(:nif_library_not_loaded)
-
- # non-nif helper functions
- defmacro fork_exec_failure(), do: 125
-
- defmacro nif_false(), do: 0
- defmacro nif_true(), do: 1
-
- def to_process_fd(:stdin), do: 0
- def to_process_fd(:stdout), do: 1
+ def nif_write(_fd, _bin), do: :erlang.nif_error(:nif_library_not_loaded)
end
diff --git a/lib/exile/watcher.ex b/lib/exile/watcher.ex
index a4cab72..7e7def9 100644
--- a/lib/exile/watcher.ex
+++ b/lib/exile/watcher.ex
@@ -1,81 +1,79 @@
defmodule Exile.Watcher do
+ @moduledoc false
+
use GenServer, restart: :temporary
require Logger
- alias Exile.ProcessNif
+ alias Exile.ProcessNif, as: Nif
def start_link(args) do
{:ok, _pid} = GenServer.start_link(__MODULE__, args)
end
- def watch(pid, context) do
- spec = {Exile.Watcher, %{pid: pid, context: context}}
+ def watch(pid, os_pid, socket_path) do
+ spec = {Exile.Watcher, %{pid: pid, os_pid: os_pid, socket_path: socket_path}}
DynamicSupervisor.start_child(Exile.WatcherSupervisor, spec)
end
def init(args) do
- %{pid: pid, context: context} = args
+ %{pid: pid, os_pid: os_pid, socket_path: socket_path} = args
Process.flag(:trap_exit, true)
ref = Elixir.Process.monitor(pid)
- {:ok, %{pid: pid, context: context, ref: ref}}
+ {:ok, %{pid: pid, os_pid: os_pid, socket_path: socket_path, ref: ref}}
end
- def handle_info({:DOWN, ref, :process, pid, _reason}, %{pid: pid, context: context, ref: ref}) do
- attempt_graceful_exit(context)
+ def handle_info({:DOWN, ref, :process, pid, _reason}, %{
+ pid: pid,
+ socket_path: socket_path,
+ os_pid: os_pid,
+ ref: ref
+ }) do
+ File.rm!(socket_path)
+ attempt_graceful_exit(os_pid)
{:stop, :normal, nil}
end
def handle_info({:EXIT, _, reason}, nil), do: {:stop, reason, nil}
# when watcher is attempted to be killed, we forcefully kill external os process.
# This can happen when beam receive SIGTERM
- def handle_info({:EXIT, _, reason}, %{pid: pid, context: context}) do
+ def handle_info({:EXIT, _, reason}, %{pid: pid, socket_path: socket_path, os_pid: os_pid}) do
Logger.debug(fn -> "Watcher exiting. reason: #{inspect(reason)}" end)
+ File.rm!(socket_path)
Exile.Process.stop(pid)
- attempt_graceful_exit(context)
+ attempt_graceful_exit(os_pid)
{:stop, reason, nil}
end
- # for proper process exit parent of the child *must* wait() for
- # child processes termination exit and "pickup" after the exit
- # (receive child exit_status). Resources acquired by child such as
- # file descriptors won't be released even if the child process
- # itself is terminated.
- defp attempt_graceful_exit(context) do
+ defp attempt_graceful_exit(os_pid) do
try do
Logger.debug(fn -> "Stopping external program" end)
- # sys_close is idempotent, calling it multiple times is okay
- ProcessNif.sys_close(context, ProcessNif.to_process_fd(:stdin))
- ProcessNif.sys_close(context, ProcessNif.to_process_fd(:stdout))
-
# at max we wait for 100ms for program to exit
- process_exit?(context, 100) && throw(:done)
+ process_exit?(os_pid, 100) && throw(:done)
Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
- ProcessNif.sys_terminate(context)
- process_exit?(context, 100) && throw(:done)
+ Nif.nif_kill(os_pid, :sigterm)
+ process_exit?(os_pid, 100) && throw(:done)
Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
- ProcessNif.sys_kill(context)
- process_exit?(context, 1000) && throw(:done)
+ Nif.nif_kill(os_pid, :sigkill)
+ process_exit?(os_pid, 200) && throw(:done)
- Logger.error("[exile] failed to kill external process")
+ Logger.error("failed to kill external process")
raise "Failed to kill external process"
catch
:done -> Logger.debug(fn -> "External program exited successfully" end)
end
end
- defp process_exit?(context) do
- match?({:ok, _}, ProcessNif.sys_wait(context))
- end
+ defp process_exit?(os_pid), do: !Nif.nif_is_os_pid_alive(os_pid)
- defp process_exit?(context, timeout) do
- if process_exit?(context) do
+ defp process_exit?(os_pid, timeout) do
+ if process_exit?(os_pid) do
true
else
:timer.sleep(timeout)
- process_exit?(context)
+ process_exit?(os_pid)
end
end
end
diff --git a/mix.exs b/mix.exs
index ffb9f00..fcb71d2 100644
--- a/mix.exs
+++ b/mix.exs
@@ -1,31 +1,56 @@
defmodule Exile.MixProject do
use Mix.Project
def project do
[
app: :exile,
version: "0.1.0",
elixir: "~> 1.7",
start_permanent: Mix.env() == :prod,
compilers: [:elixir_make] ++ Mix.compilers(),
make_targets: ["all"],
make_clean: ["clean"],
- deps: deps()
+ deps: deps(),
+
+ # Package
+ package: package(),
+ description: description(),
+
+ # Docs
+ source_url: "https://github.com/akash-akya/exile",
+ homepage_url: "https://github.com/akash-akya/exile",
+ docs: [
+ main: "readme",
+ extras: ["README.md"]
+ ]
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
mod: {Exile, []},
extra_applications: [:logger]
]
end
+ defp description do
+ "NIF based solution to interact with external programs with back-pressure"
+ end
+
+ defp package do
+ [
+ maintainers: ["Akash Hiremath"],
+ licenses: ["MIT"],
+ links: %{GitHub: "https://github.com/akash-akya/exile"}
+ ]
+ end
+
# Run "mix help deps" to learn about dependencies.
defp deps do
[
- {:elixir_make, "~> 0.6", runtime: false}
+ {:elixir_make, "~> 0.6", runtime: false},
+ {:ex_doc, ">= 0.0.0", only: :dev}
]
end
end
diff --git a/mix.lock b/mix.lock
index a7f64e0..868c03b 100644
--- a/mix.lock
+++ b/mix.lock
@@ -1,3 +1,9 @@
%{
- "elixir_make": {:hex, :elixir_make, "0.6.0", "38349f3e29aff4864352084fc736fa7fa0f2995a819a737554f7ebd28b85aaab", [:mix], [], "hexpm", "d522695b93b7f0b4c0fcb2dfe73a6b905b1c301226a5a55cb42e5b14d509e050"}
+ "earmark_parser": {:hex, :earmark_parser, "1.4.12", "b245e875ec0a311a342320da0551da407d9d2b65d98f7a9597ae078615af3449", [:mix], [], "hexpm", "711e2cc4d64abb7d566d43f54b78f7dc129308a63bc103fbd88550d2174b3160"},
+ "elixir_make": {:hex, :elixir_make, "0.6.0", "38349f3e29aff4864352084fc736fa7fa0f2995a819a737554f7ebd28b85aaab", [:mix], [], "hexpm", "d522695b93b7f0b4c0fcb2dfe73a6b905b1c301226a5a55cb42e5b14d509e050"},
+ "ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"},
+ "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
+ "makeup_elixir": {:hex, :makeup_elixir, "0.15.0", "98312c9f0d3730fde4049985a1105da5155bfe5c11e47bdc7406d88e01e4219b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "75ffa34ab1056b7e24844c90bfc62aaf6f3a37a15faa76b07bc5eba27e4a8b4a"},
+ "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
+ "temp": {:hex, :temp, "0.4.7", "2c78482cc2294020a4bc0c95950b907ff386523367d4e63308a252feffbea9f2", [:mix], [], "hexpm", "6af19e7d6a85a427478be1021574d1ae2a1e1b90882586f06bde76c63cd03e0d"},
}
diff --git a/test/exile/process_nif_test.exs b/test/exile/process_nif_test.exs
index 2358c0d..19c2584 100644
--- a/test/exile/process_nif_test.exs
+++ b/test/exile/process_nif_test.exs
@@ -1,10 +1,3 @@
defmodule Exile.ProcessNifTest do
use ExUnit.Case, async: false
- alias Exile.ProcessNif
-
- test "exit before exec" do
- {:ok, ctx} = ProcessNif.execute(['invalid'], [], '', 0)
- :timer.sleep(500)
- assert {:ok, {:exit, 125}} = ProcessNif.sys_wait(ctx)
- end
end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index 890a520..ab923e1 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,333 +1,366 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
alias Exile.Process
test "read" do
{:ok, s} = Process.start_link(~w(echo test))
assert {:eof, iodata} = Process.read(s, 100)
assert IO.iodata_to_binary(iodata) == "test\n"
assert :ok == Process.close_stdin(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "write" do
{:ok, s} = Process.start_link(~w(cat))
assert :ok == Process.write(s, "hello")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "hello"
assert :ok == Process.write(s, "world")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "world"
assert :ok == Process.close_stdin(s)
assert {:eof, []} == Process.read(s)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
end
test "stdin close" do
logger = start_events_collector()
# base64 produces output only after getting EOF from stdin. we
# collect events in order and assert that we can still read from
# stdout even after closing stdin
{:ok, s} = Process.start_link(~w(base64))
# parallel reader should be blocked till we close stdin
start_parallel_reader(s, logger)
:timer.sleep(100)
assert :ok == Process.write(s, "hello")
add_event(logger, {:write, "hello"})
assert :ok == Process.write(s, "world")
add_event(logger, {:write, "world"})
:timer.sleep(100)
assert :ok == Process.close_stdin(s)
add_event(logger, :input_close)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
Process.stop(s)
assert [
{:write, "hello"},
{:write, "world"},
:input_close,
{:read, "aGVsbG93b3JsZA==\n"},
:eof
] == get_events(logger)
end
test "external command termination on stop" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
:timer.sleep(100)
refute os_process_alive?(os_pid)
end
test "external command kill on stop" do
- # cat command hangs waiting for EOF
{:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
Process.stop(s)
if os_process_alive?(os_pid) do
- :timer.sleep(3000)
+ :timer.sleep(1000)
refute os_process_alive?(os_pid)
else
:ok
end
end
test "exit status" do
- {:ok, s} = Process.start_link(~w(sh -c "exit 2"))
- assert {:ok, {:exit, 2}} == Process.await_exit(s, 500)
+ {:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
+ assert {:ok, {:exit, 10}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "writing binary larger than pipe buffer size" do
large_bin = generate_binary(5 * 65535)
{:ok, s} = Process.start_link(~w(cat))
writer =
Task.async(fn ->
Process.write(s, large_bin)
Process.close_stdin(s)
end)
:timer.sleep(100)
{_, iodata} = Process.read(s, 5 * 65535)
Task.await(writer)
assert IO.iodata_length(iodata) == 5 * 65535
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
end
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link(~w(cat))
large_bin = generate_binary(65535 * 5)
writer =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.write(s, large_bin)
add_event(logger, {:write, i})
end)
Process.close_stdin(s)
end)
:timer.sleep(50)
reader =
Task.async(fn ->
Enum.each(1..10, fn i ->
Process.read(s, 5 * 65535)
add_event(logger, {:read, i})
# delay in reading should delay writes
:timer.sleep(10)
end)
end)
Task.await(writer)
Task.await(reader)
assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
Process.stop(s)
assert [
write: 1,
read: 1,
write: 2,
read: 2,
write: 3,
read: 3,
write: 4,
read: 4,
write: 5,
read: 5,
write: 6,
read: 6,
write: 7,
read: 7,
write: 8,
read: 8,
write: 9,
read: 9,
write: 10,
read: 10
] == get_events(logger)
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
Process.stop(s)
open_files = parse_lsof(bin)
assert [%{fd: "0", name: _, type: "PIPE"}, %{type: "PIPE", fd: "1", name: _}] = open_files
end
test "process kill with pending write" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
large_data =
Stream.cycle(["test"]) |> Stream.take(500_000) |> Enum.to_list() |> IO.iodata_to_binary()
task =
Task.async(fn ->
try do
Process.write(s, large_data)
catch
:exit, reason -> reason
end
end)
:timer.sleep(200)
Process.stop(s)
:timer.sleep(3000)
refute os_process_alive?(os_pid)
assert {:normal, _} = Task.await(task)
end
+ test "concurrent read" do
+ {:ok, s} = Process.start_link(~w(cat))
+
+ task = Task.async(fn -> Process.read(s, 1) end)
+
+ # delaying concurrent read to avoid race-condition
+ Elixir.Process.sleep(100)
+ assert {:error, :pending_read} = Process.read(s, 1)
+
+ assert :ok == Process.close_stdin(s)
+ assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
+ Process.stop(s)
+ _ = Task.await(task)
+ end
+
test "cd" do
parent = Path.expand("..", File.cwd!())
{:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
{:ok, dir} = Process.read(s)
assert String.trim(dir) == parent
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "invalid path" do
assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
end
test "invalid opt" do
assert {:error, "invalid opts: [invalid: :test]"} =
Process.start_link(~w(cat), invalid: :test)
end
test "env" do
assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
assert {:ok, "test\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "if external process inherits beam env" do
:ok = System.put_env([{"BEAM_ENV_A", "10"}])
assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
assert {:ok, "10\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
test "if user env overrides beam env" do
:ok = System.put_env([{"BEAM_ENV", "base"}])
assert {:ok, s} =
Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
assert {:ok, "overridden\n"} = Process.read(s)
assert {:ok, {:exit, 0}} = Process.await_exit(s)
Process.stop(s)
end
+ test "await_exit when process is stopped" do
+ assert {:ok, s} = Process.start_link(~w(cat))
+
+ tasks =
+ Enum.map(1..10, fn _ ->
+ Task.async(fn -> Process.await_exit(s) end)
+ end)
+
+ assert :ok == Process.close_stdin(s)
+
+ Elixir.Process.sleep(100)
+
+ Enum.each(tasks, fn task ->
+ assert {:ok, {:exit, 0}} = Task.await(task)
+ end)
+
+ Process.stop(s)
+ end
+
def start_parallel_reader(proc_server, logger) do
spawn_link(fn -> reader_loop(proc_server, logger) end)
end
def reader_loop(proc_server, logger) do
case Process.read(proc_server) do
{:ok, data} ->
add_event(logger, {:read, data})
reader_loop(proc_server, logger)
{:eof, []} ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
Stream.repeatedly(fn -> "A" end) |> Enum.take(size) |> IO.iodata_to_binary()
end
end
diff --git a/test/exile/watcher_test.exs b/test/exile/watcher_test.exs
new file mode 100644
index 0000000..fc55ba0
--- /dev/null
+++ b/test/exile/watcher_test.exs
@@ -0,0 +1,55 @@
+defmodule Exile.WatcherTest do
+ use ExUnit.Case, async: true
+ alias Exile.Process
+
+ test "uds path socket cleanup after successful exit" do
+ {:ok, s} = Process.start_link(~w(cat))
+ %{socket_path: socket_path} = :sys.get_state(s)
+
+ assert File.exists?(socket_path)
+
+ :ok = Process.close_stdin(s)
+ Elixir.Process.sleep(100)
+
+ {:ok, {:exit, 0}} = Process.await_exit(s)
+ :ok = Process.stop(s)
+
+ Elixir.Process.sleep(100)
+
+ refute File.exists?(socket_path)
+ end
+
+ test "external process and uds path cleanup on error" do
+ {:ok, s} = Process.start_link(~w(cat))
+ %{socket_path: socket_path} = :sys.get_state(s)
+ {:ok, os_pid} = Process.os_pid(s)
+
+ assert File.exists?(socket_path)
+ assert os_process_alive?(os_pid)
+
+ Elixir.Process.exit(s, :kill)
+ Elixir.Process.sleep(100)
+
+ refute File.exists?(socket_path)
+ refute os_process_alive?(os_pid)
+ end
+
+ test "if external process is killed with SIGTERM" do
+ {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+
+ {:ok, os_pid} = Process.os_pid(s)
+ assert os_process_alive?(os_pid)
+ Process.stop(s)
+
+ :timer.sleep(1000)
+ refute os_process_alive?(os_pid)
+ end
+
+ defp os_process_alive?(pid) do
+ match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
+ end
+
+ defp fixture(script) do
+ Path.join([__DIR__, "../scripts", script])
+ end
+end

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 6:31 AM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40939
Default Alt Text
(96 KB)

Event Timeline