Page MenuHomePhorge

No OneTemporary

Size
157 KB
Referenced Files
None
Subscribers
None
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index fa418dc..199960c 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -1,35 +1,86 @@
-name: CI
+name: Elixir CI
on:
- push
- pull_request
jobs:
test:
- runs-on: ubuntu-latest
+ runs-on: ubuntu-20.04
name: Test - Elixir ${{matrix.elixir}} / OTP ${{matrix.otp}}
strategy:
matrix:
include:
- elixir: 1.10.x
otp: 22.x
- elixir: 1.12.x
otp: 23.x
- elixir: 1.14.x
otp: 24.x
steps:
- uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}
- uses: actions/checkout@v3
+ - name: Cache Dependencies
+ id: mix-cache
+ uses: actions/cache@v3
+ with:
+ path: |
+ deps
+ _build
+ key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }}
+
+ - name: Install Dependencies
+ if: steps.mix-cache.outputs.cache-hit != 'true'
+ run: |
+ mix deps.get
+ mix deps.compile
+
+ - run: gcc --version
+ - run: mix compile --warnings-as-errors
+ - run: mix test --exclude skip:true --trace
+
+ lint:
+ runs-on: ubuntu-22.04
+ name: Lint
+ strategy:
+ matrix:
+ include:
+ - elixir: 1.14.x
+ otp: 25.x
+ steps:
+ - uses: erlef/setup-beam@v1
+ with:
+ otp-version: ${{matrix.otp}}
+ elixir-version: ${{matrix.elixir}}
+
+ - uses: actions/checkout@v3
+
+ - name: Cache Dependencies
+ id: mix-cache
+ uses: actions/cache@v3
+ with:
+ path: |
+ deps
+ _build
+ key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }}
+
+ - name: Install Dependencies
+ if: steps.mix-cache.outputs.cache-hit != 'true'
+ run: |
+ mkdir -p priv/plts
+ mix deps.get
+ mix deps.compile
+ mix dialyzer --plt
+
- run: mix deps.get
- run: mix deps.unlock --check-unused
- run: mix format --check-formatted
- - run: gcc --version
- - run: mix compile --force --warnings-as-errors
- - run: mix test --exclude skip:true --trace
+ - run: mix credo --strict
+ - run: mix dialyzer --plt
diff --git a/Makefile b/Makefile
index b1f9721..466c836 100644
--- a/Makefile
+++ b/Makefile
@@ -1,28 +1,27 @@
calling_from_make:
mix compile
UNAME := $(shell uname)
-CFLAGS ?= -D_POSIX_C_SOURCE=200809L -Wall -Werror -Wno-unused-parameter -pedantic -std=c99 -O2
+CFLAGS ?= -D_POSIX_C_SOURCE=200809L -Wall -Werror -Wno-unused-parameter -pedantic -std=c99 -O2 -fsanitize=undefined
ifeq ($(UNAME), Darwin)
TARGET_CFLAGS ?= -fPIC -undefined dynamic_lookup -dynamiclib -Wextra
endif
ifeq ($(UNAME), Linux)
TARGET_CFLAGS ?= -fPIC -shared
endif
-
all: priv/exile.so priv/spawner
priv/exile.so: c_src/exile.c
mkdir -p priv
$(CC) -I$(ERL_INTERFACE_INCLUDE_DIR) $(TARGET_CFLAGS) $(CFLAGS) c_src/exile.c -o priv/exile.so
priv/spawner: c_src/spawner.c
mkdir -p priv
$(CC) $(CFLAGS) c_src/spawner.c -o priv/spawner
clean:
@rm -rf priv/exile.so priv/spawner
diff --git a/README.md b/README.md
index 7775a2a..043ca3b 100644
--- a/README.md
+++ b/README.md
@@ -1,94 +1,202 @@
# Exile
[![CI](https://github.com/akash-akya/exile/actions/workflows/ci.yaml/badge.svg)](https://github.com/akash-akya/exile/actions/workflows/ci.yaml)
[![Hex.pm](https://img.shields.io/hexpm/v/exile.svg)](https://hex.pm/packages/exile)
[![docs](https://img.shields.io/badge/docs-hexpm-blue.svg)](https://hexdocs.pm/exile/)
Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix ports issues.
Exile is built around the idea of having demand-driven, asynchronous interaction with external process. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
``` elixir
-Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
+Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
See `Exile.stream!/2` module doc for more details about handling stderr and other options.
`Exile.stream!/2` is a convenience wrapper around `Exile.Process`. Prefer using `Exile.stream!` over using `Exile.Process` directly.
Exile requires OTP v22.1 and above.
Exile is based on NIF, please know consequence of that before using Exile. For basic use cases use [ExCmd](https://github.com/akash-akya/ex_cmd) instead.
+
## Installation
```elixir
def deps do
[
{:exile, "~> x.x.x"}
]
end
```
+
+## Quick Start
+
+ Run a command and read from stdout
+
+ ```
+ iex> Exile.stream!(~w(echo Hello))
+ ...> |> Enum.into("") # collect as string
+ "Hello\n"
+ ```
+
+ Run a command with list of strings as input
+
+ ```
+ iex> Exile.stream!(~w(cat), input: ["Hello", " ", "World"])
+ ...> |> Enum.into("") # collect as string
+ "Hello World"
+ ```
+
+ Run a command with input as Stream
+
+ ```
+ iex> input_stream = Stream.map(1..10, fn num -> "#{num} " end)
+ iex> Exile.stream!(~w(cat), input: input_stream)
+ ...> |> Enum.into("")
+ "1 2 3 4 5 6 7 8 9 10 "
+ ```
+
+ Run a command with input as infinite stream
+
+ ```
+ # create infinite stream
+ iex> input_stream = Stream.repeatedly(fn -> "A" end)
+ iex> binary =
+ ...> Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true) # we need to ignore epipe since we are terminating the program before the input completes
+ ...> |> Stream.take(2) # we must limit since the input stream is infinite
+ ...> |> Enum.into("")
+ iex> is_binary(binary)
+ true
+ iex> "AAAAA" <> _ = binary
+ ```
+
+ Run a command with input Collectable
+
+ ```
+ # Exile calls the callback with a sink where the process can push the data
+ iex> Exile.stream!(~w(cat), input: fn sink ->
+ ...> Stream.map(1..10, fn num -> "#{num} " end)
+ ...> |> Stream.into(sink) # push to the external process
+ ...> |> Stream.run()
+ ...> end)
+ ...> |> Stream.take(100) # we must limit since the input stream is infinite
+ ...> |> Enum.into("")
+ "1 2 3 4 5 6 7 8 9 10 "
+ ```
+
+ When the command wait for the input stream to close
+
+ ```
+ # base64 command wait for the input to close and writes data to stdout at once
+ iex> Exile.stream!(~w(base64), input: ["abcdef"])
+ ...> |> Enum.into("")
+ "YWJjZGVm\n"
+ ```
+
+ When the command exit with an error
+
+ ```
+ iex> Exile.stream!(["sh", "-c", "exit 4"])
+ ...> |> Enum.into("")
+ ** (Exile.Process.Error) command exited with status: 4
+ ```
+
+ With `max_chunk_size` set
+
+ ```
+ iex> data =
+ ...> Exile.stream!(~w(cat /dev/urandom), max_chunk_size: 100, ignore_epipe: true)
+ ...> |> Stream.take(5)
+ ...> |> Enum.into("")
+ iex> byte_size(data)
+ 500
+ ```
+
+ When input and output run at different rate
+
+ ```
+ iex> input_stream = Stream.map(1..1000, fn num -> "X #{num} X\n" end)
+ iex> Exile.stream!(~w(grep 250), input: input_stream)
+ ...> |> Enum.into("")
+ "X 250 X\n"
+ ```
+
+ With stderr enabled
+
+ ```
+ iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], enable_stderr: true)
+ ...> |> Enum.to_list()
+ [{:stdout, "foo\n"}, {:stderr, "bar\n"}]
+ ```
+
+ For more details about stream API, see `Exile.stream!/2`.
+
+ For more details about inner working, please check `Exile.Process`
+ documentation.
+
+
## Rationale
Existing approaches
#### Port
Port is the default way of executing external commands. This is okay when you have control over the external program's implementation and the interaction is minimal. Port has several important issues.
* it can end up creating [zombie process](https://hexdocs.pm/elixir/Port.html#module-zombie-operating-system-processes)
* cannot selectively close stdin. This is required when the external programs act on EOF from stdin
* it sends command output as a message to the beam process. This does not put back pressure on the external program and leads exhausting VM memory
#### Middleware based solutions
Libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. solves the first two issues associated with ports - zombie process and selectively closing STDIN. But not the third issue - having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
* no back-pressure
* additional os process (middleware) for every execution of your program
* in few cases such as porcelain user has to install this external program explicitly
* might not be suitable when the program requires constant communication between beam process and external program
On the plus side, unlike Exile, bugs in the implementation does not bring down whole beam VM.
#### [ExCmd](https://github.com/akash-akya/ex_cmd)
This is my other stab at solving back pressure on the external program issue. It implements a demand-driven protocol using [odu](https://github.com/akash-akya/odu) to solve this. Since ExCmd is also a port based solution, concerns previously mentioned applies to ExCmd too.
## Exile
Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication instead does raw stdio using NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Makes use of dirty-schedulers for IO.
**Highlights**
* Back pressure
* no middleware program
* no additional os process. No performance/resource cost
* no need to install any external command
* tries to handle zombie process by attempting to clean up external process. *But* as there is no middleware involved with exile, so it is still possible to endup with zombie process if program misbehave.
* stream abstraction
* selectively consume stdout and stderr streams
If you are running executing huge number of external programs **concurrently** (more than few hundred) you might have to increase open file descriptors limit (`ulimit -n`)
Non-blocking io can be used for other interesting things. Such as reading named pipe (FIFO) files. `Exile.stream!(~w(cat data.pipe))` does not block schedulers, so you can open hundreds of fifo files unlike default `file` based io.
#### TODO
* add benchmarks results
### 🚨 Obligatory NIF warning
As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls. Also, spawned external processes are still completely isolated at OS level.
If all you want is to run a command with no communication, then just sticking with `System.cmd` is a better.
### License
Copyright (c) 2020 Akash Hiremath.
Exile source code is released under Apache License 2.0. Check [LICENSE](LICENSE.md) for more information.
diff --git a/c_src/exile.c b/c_src/exile.c
index ed493e3..5e1645c 100644
--- a/c_src/exile.c
+++ b/c_src/exile.c
@@ -1,351 +1,361 @@
#include "erl_nif.h"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "utils.h"
#ifdef ERTS_DIRTY_SCHEDULERS
#define USE_DIRTY_IO ERL_NIF_DIRTY_JOB_IO_BOUND
#else
#define USE_DIRTY_IO 0
#endif
static const int UNBUFFERED_READ = -1;
static const int PIPE_BUF_SIZE = 65535;
static const int FD_CLOSED = -1;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_UNDEFINED;
static ERL_NIF_TERM ATOM_INVALID_FD;
static ERL_NIF_TERM ATOM_SELECT_CANCEL_ERROR;
static ERL_NIF_TERM ATOM_EAGAIN;
+static ERL_NIF_TERM ATOM_EPIPE;
-static ERL_NIF_TERM ATOM_SIGKILL;
static ERL_NIF_TERM ATOM_SIGTERM;
+static ERL_NIF_TERM ATOM_SIGKILL;
+static ERL_NIF_TERM ATOM_SIGPIPE;
static void close_fd(int *fd) {
if (*fd != FD_CLOSED) {
close(*fd);
*fd = FD_CLOSED;
}
}
static int cancel_select(ErlNifEnv *env, int *fd) {
int ret;
if (*fd != FD_CLOSED) {
ret = enif_select(env, *fd, ERL_NIF_SELECT_STOP, fd, NULL, ATOM_UNDEFINED);
if (ret < 0)
perror("cancel_select()");
return ret;
}
return 0;
}
static void io_resource_dtor(ErlNifEnv *env, void *obj) {
debug("Exile io_resource_dtor called");
}
static void io_resource_stop(ErlNifEnv *env, void *obj, int fd,
int is_direct_call) {
close_fd(&fd);
debug("Exile io_resource_stop called %d", fd);
}
static void io_resource_down(ErlNifEnv *env, void *obj, ErlNifPid *pid,
ErlNifMonitor *monitor) {
int *fd = (int *)obj;
cancel_select(env, fd);
debug("Exile io_resource_down called");
}
static ErlNifResourceTypeInit io_rt_init;
static ErlNifResourceType *FD_RT;
static inline ERL_NIF_TERM make_ok(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_OK, term);
}
static inline ERL_NIF_TERM make_error(ErlNifEnv *env, ERL_NIF_TERM term) {
return enif_make_tuple2(env, ATOM_ERROR, term);
}
/* time is assumed to be in microseconds */
static void notify_consumed_timeslice(ErlNifEnv *env, ErlNifTime start,
ErlNifTime stop) {
ErlNifTime pct;
pct = (ErlNifTime)((stop - start) / 10);
if (pct > 100)
pct = 100;
else if (pct == 0)
pct = 1;
enif_consume_timeslice(env, pct);
}
static int select_write(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_WRITE, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_write()");
return ret;
}
static ERL_NIF_TERM nif_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 2);
ErlNifTime start;
ssize_t size;
ErlNifBinary bin;
int write_errno;
int *fd;
start = enif_monotonic_time(ERL_NIF_USEC);
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (enif_inspect_binary(env, argv[1], &bin) != true)
return enif_make_badarg(env);
if (bin.size == 0)
return enif_make_badarg(env);
/* should we limit the bin.size here? */
size = write(*fd, bin.data, bin.size);
write_errno = errno;
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (size >= (ssize_t)bin.size) { // request completely satisfied
return make_ok(env, enif_make_int(env, size));
} else if (size >= 0) { // request partially satisfied
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_ok(env, enif_make_int(env, size));
} else if (write_errno == EAGAIN || write_errno == EWOULDBLOCK) { // busy
int retval = select_write(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
+ } else if (write_errno == EPIPE) {
+ return make_error(env, ATOM_EPIPE);
} else {
perror("write()");
return make_error(env, enif_make_int(env, write_errno));
}
}
static int select_read(ErlNifEnv *env, int *fd) {
int ret =
enif_select(env, *fd, ERL_NIF_SELECT_READ, fd, NULL, ATOM_UNDEFINED);
if (ret != 0)
perror("select_read()");
return ret;
}
static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 1);
ERL_NIF_TERM term;
ErlNifPid pid;
int *fd;
int ret;
fd = enif_alloc_resource(FD_RT, sizeof(int));
if (!enif_get_int(env, argv[0], fd))
goto error_exit;
if (!enif_self(env, &pid)) {
error("failed get self pid");
goto error_exit;
}
ret = enif_monitor_process(env, fd, &pid, NULL);
if (ret < 0) {
error("no down callback is provided");
goto error_exit;
} else if (ret > 0) {
error("pid is not alive");
goto error_exit;
}
term = enif_make_resource(env, fd);
enif_release_resource(fd);
return make_ok(env, term);
error_exit:
enif_release_resource(fd);
return ATOM_ERROR;
}
static ERL_NIF_TERM read_fd(ErlNifEnv *env, int *fd, int max_size) {
if (max_size == UNBUFFERED_READ) {
max_size = PIPE_BUF_SIZE;
} else if (max_size < 1) {
return enif_make_badarg(env);
} else if (max_size > PIPE_BUF_SIZE) {
max_size = PIPE_BUF_SIZE;
}
ErlNifTime start = enif_monotonic_time(ERL_NIF_USEC);
unsigned char buf[max_size];
ssize_t result = read(*fd, buf, max_size);
int read_errno = errno;
ERL_NIF_TERM bin_term = 0;
if (result >= 0) {
/* no need to release this binary */
unsigned char *temp = enif_make_new_binary(env, result, &bin_term);
memcpy(temp, buf, result);
}
notify_consumed_timeslice(env, start, enif_monotonic_time(ERL_NIF_USEC));
if (result >= 0) {
return make_ok(env, bin_term);
} else if (read_errno == EAGAIN || read_errno == EWOULDBLOCK) { // busy
int retval = select_read(env, fd);
if (retval != 0)
return make_error(env, enif_make_int(env, retval));
return make_error(env, ATOM_EAGAIN);
+ } else if (read_errno == EPIPE) {
+ return make_error(env, ATOM_EPIPE);
} else {
perror("read_fd()");
return make_error(env, enif_make_int(env, read_errno));
}
}
static ERL_NIF_TERM nif_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 2);
int max_size;
int *fd;
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (!enif_get_int(env, argv[1], &max_size))
return enif_make_badarg(env);
return read_fd(env, fd, max_size);
}
static ERL_NIF_TERM nif_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 1);
int *fd;
if (!enif_get_resource(env, argv[0], FD_RT, (void **)&fd))
return make_error(env, ATOM_INVALID_FD);
if (cancel_select(env, fd) < 0)
return make_error(env, ATOM_SELECT_CANCEL_ERROR);
close_fd(fd);
return ATOM_OK;
}
static ERL_NIF_TERM nif_is_os_pid_alive(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 1);
pid_t pid;
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
int result = kill(pid, 0);
if (result == 0)
return ATOM_TRUE;
else
return ATOM_FALSE;
}
static ERL_NIF_TERM nif_kill(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
ASSERT_ARGC(argc, 2);
pid_t pid;
int ret;
// we should not assume pid type to be `int`?
if (!enif_get_int(env, argv[0], (int *)&pid))
return enif_make_badarg(env);
if (enif_compare(argv[1], ATOM_SIGKILL) == 0)
ret = kill(pid, SIGKILL);
else if (enif_compare(argv[1], ATOM_SIGTERM) == 0)
ret = kill(pid, SIGTERM);
- else
+ else if (enif_compare(argv[1], ATOM_SIGPIPE) == 0) {
+ ret = kill(pid, SIGPIPE);
+ } else
return enif_make_badarg(env);
if (ret != 0) {
perror("[exile] failed to send signal");
return make_error(
env, enif_make_string(env, "failed to send signal", ERL_NIF_LATIN1));
}
return ATOM_OK;
}
static int on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM load_info) {
io_rt_init.dtor = io_resource_dtor;
io_rt_init.stop = io_resource_stop;
io_rt_init.down = io_resource_down;
FD_RT =
enif_open_resource_type_x(env, "exile_resource", &io_rt_init,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_FALSE = enif_make_atom(env, "false");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_UNDEFINED = enif_make_atom(env, "undefined");
ATOM_INVALID_FD = enif_make_atom(env, "invalid_fd_resource");
ATOM_EAGAIN = enif_make_atom(env, "eagain");
+ ATOM_EPIPE = enif_make_atom(env, "epipe");
ATOM_SELECT_CANCEL_ERROR = enif_make_atom(env, "select_cancel_error");
ATOM_SIGTERM = enif_make_atom(env, "sigterm");
ATOM_SIGKILL = enif_make_atom(env, "sigkill");
+ ATOM_SIGPIPE = enif_make_atom(env, "sigpipe");
return 0;
}
static void on_unload(ErlNifEnv *env, void *priv) {
debug("exile unload");
enif_free(priv);
}
static ErlNifFunc nif_funcs[] = {
{"nif_read", 2, nif_read, USE_DIRTY_IO},
{"nif_create_fd", 1, nif_create_fd, USE_DIRTY_IO},
{"nif_write", 2, nif_write, USE_DIRTY_IO},
{"nif_close", 1, nif_close, USE_DIRTY_IO},
{"nif_is_os_pid_alive", 1, nif_is_os_pid_alive, USE_DIRTY_IO},
{"nif_kill", 2, nif_kill, USE_DIRTY_IO}};
-ERL_NIF_INIT(Elixir.Exile.ProcessNif, nif_funcs, &on_load, NULL, NULL,
+ERL_NIF_INIT(Elixir.Exile.Process.Nif, nif_funcs, &on_load, NULL, NULL,
&on_unload)
diff --git a/c_src/spawner.c b/c_src/spawner.c
index 104307c..6816368 100644
--- a/c_src/spawner.c
+++ b/c_src/spawner.c
@@ -1,252 +1,252 @@
#include <fcntl.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <unistd.h>
// these definitions are Linux only at the moment
#ifndef CMSG_LEN
socklen_t CMSG_LEN(size_t len) {
return (CMSG_DATA((struct cmsghdr *)NULL) - (unsigned char *)NULL) + len;
}
#endif
#ifndef CMSG_SPACE
socklen_t CMSG_SPACE(size_t len) {
struct msghdr msg;
struct cmsghdr cmsg;
msg.msg_control = &cmsg;
msg.msg_controllen =
~0; /* To maximize the chance that CMSG_NXTHDR won't return NULL */
cmsg.cmsg_len = CMSG_LEN(len);
return (unsigned char *)CMSG_NXTHDR(&msg, &cmsg) - (unsigned char *)&cmsg;
}
#endif
// #define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, __func__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
} while (0)
#else
#define debug(...)
#endif
#define error(...) \
do { \
fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, __func__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
} while (0)
static const int PIPE_READ = 0;
static const int PIPE_WRITE = 1;
/* We are choosing an exit code which is not reserved see:
* https://www.tldp.org/LDP/abs/html/exitcodes.html. */
static const int FORK_EXEC_FAILURE = 125;
static int set_flag(int fd, int flags) {
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
}
static int send_io_fds(int socket, int stdin_fd, int stdout_fd, int stderr_fd) {
struct msghdr msg = {0};
struct cmsghdr *cmsg;
int fds[3];
char buf[CMSG_SPACE(3 * sizeof(int))];
char dup[256];
struct iovec io;
memset(buf, '\0', sizeof(buf));
io.iov_base = &dup;
io.iov_len = sizeof(dup);
msg.msg_iov = &io;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(3 * sizeof(int));
fds[0] = stdin_fd;
fds[1] = stdout_fd;
fds[2] = stderr_fd;
debug("stdout: %d, stderr: %d", stdout_fd, stderr_fd);
memcpy((int *)CMSG_DATA(cmsg), fds, 3 * sizeof(int));
if (sendmsg(socket, &msg, 0) < 0) {
debug("Failed to send message");
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
static void close_pipes(int pipes[3][2]) {
for (int i = 0; i < 3; i++) {
if (pipes[i][PIPE_READ] > 0)
close(pipes[i][PIPE_READ]);
if (pipes[i][PIPE_WRITE] > 0)
close(pipes[i][PIPE_WRITE]);
}
}
static int exec_process(char const *bin, char *const *args, int socket,
- bool use_stderr) {
+ bool enable_stderr) {
int pipes[3][2] = {{0, 0}, {0, 0}, {0, 0}};
int r_cmdin, w_cmdin, r_cmdout, w_cmdout, r_cmderr, w_cmderr;
int i;
if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1 ||
pipe(pipes[STDERR_FILENO]) == -1) {
perror("[spawner] failed to create pipes");
close_pipes(pipes);
return 1;
}
debug("created pipes");
r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
r_cmderr = pipes[STDERR_FILENO][PIPE_READ];
w_cmderr = pipes[STDERR_FILENO][PIPE_WRITE];
if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
set_flag(w_cmderr, O_CLOEXEC) < 0 ||
set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmderr, O_CLOEXEC | O_NONBLOCK) < 0) {
perror("[spawner] failed to set flags for pipes");
close_pipes(pipes);
return 1;
}
debug("set fd flags to pipes");
if (send_io_fds(socket, w_cmdin, r_cmdout, r_cmderr) != EXIT_SUCCESS) {
perror("[spawner] failed to send fd via socket");
close_pipes(pipes);
return 1;
}
debug("sent fds over UDS");
close(STDIN_FILENO);
close(w_cmdin);
if (dup2(r_cmdin, STDIN_FILENO) < 0) {
perror("[spawner] failed to dup to stdin");
_exit(FORK_EXEC_FAILURE);
}
close(STDOUT_FILENO);
close(r_cmdout);
if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
perror("[spawner] failed to dup to stdout");
_exit(FORK_EXEC_FAILURE);
}
- if (use_stderr) {
+ if (enable_stderr) {
close(STDERR_FILENO);
close(r_cmderr);
if (dup2(w_cmderr, STDERR_FILENO) < 0) {
perror("[spawner] failed to dup to stderr");
_exit(FORK_EXEC_FAILURE);
}
} else {
close(r_cmderr);
close(w_cmderr);
}
/* Close all non-standard io fds. Not closing STDERR */
for (i = STDERR_FILENO + 1; i < sysconf(_SC_OPEN_MAX); i++)
close(i);
debug("exec %s", bin);
execvp(bin, args);
perror("[spawner] execvp(): failed");
_exit(FORK_EXEC_FAILURE);
}
-static int spawn(const char *socket_path, const char *use_stderr_str,
+static int spawn(const char *socket_path, const char *enable_stderr_str,
const char *bin, char *const *args) {
int socket_fd;
struct sockaddr_un socket_addr;
- bool use_stderr;
+ bool enable_stderr;
- if (strcmp(use_stderr_str, "true") == 0) {
- use_stderr = true;
+ if (strcmp(enable_stderr_str, "true") == 0) {
+ enable_stderr = true;
} else {
- use_stderr = false;
+ enable_stderr = false;
}
socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (socket_fd == -1) {
debug("Failed to create socket");
return EXIT_FAILURE;
}
debug("created domain socket");
memset(&socket_addr, 0, sizeof(struct sockaddr_un));
socket_addr.sun_family = AF_UNIX;
strncpy(socket_addr.sun_path, socket_path, sizeof(socket_addr.sun_path) - 1);
if (connect(socket_fd, (struct sockaddr *)&socket_addr,
sizeof(struct sockaddr_un)) == -1) {
debug("Failed to connect to socket");
return EXIT_FAILURE;
}
debug("connected to exile");
- if (exec_process(bin, args, socket_fd, use_stderr) != 0)
+ if (exec_process(bin, args, socket_fd, enable_stderr) != 0)
return EXIT_FAILURE;
// we should never reach here
return EXIT_SUCCESS;
}
int main(int argc, const char *argv[]) {
int status, i;
const char **exec_argv;
if (argc < 4) {
debug("expected at least 3 arguments, passed %d", argc);
status = EXIT_FAILURE;
} else {
exec_argv = malloc((argc - 3 + 1) * sizeof(char *));
for (i = 3; i < argc; i++)
exec_argv[i - 3] = argv[i];
exec_argv[i - 3] = NULL;
- debug("socket path: %s use_stderr: %s bin: %s", argv[1], argv[2], argv[3]);
+ debug("socket path: %s enable_stderr: %s bin: %s", argv[1], argv[2], argv[3]);
status = spawn(argv[1], argv[2], argv[3], (char *const *)exec_argv);
}
exit(status);
}
diff --git a/lib/exile.ex b/lib/exile.ex
index f188554..b45ac32 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,105 +1,227 @@
defmodule Exile do
- @moduledoc """
- Exile is an alternative for beam ports with back-pressure and non-blocking IO
+ @moduledoc ~S"""
+ Exile is an alternative for beam [ports](https://hexdocs.pm/elixir/Port.html)
+ with back-pressure and non-blocking IO.
+
+ ### Quick Start
+
+ Run a command and read from stdout
+
+ ```
+ iex> Exile.stream!(~w(echo Hello))
+ ...> |> Enum.into("") # collect as string
+ "Hello\n"
+ ```
+
+ Run a command with list of strings as input
+
+ ```
+ iex> Exile.stream!(~w(cat), input: ["Hello", " ", "World"])
+ ...> |> Enum.into("") # collect as string
+ "Hello World"
+ ```
+
+ Run a command with input as Stream
+
+ ```
+ iex> input_stream = Stream.map(1..10, fn num -> "#{num} " end)
+ iex> Exile.stream!(~w(cat), input: input_stream)
+ ...> |> Enum.into("")
+ "1 2 3 4 5 6 7 8 9 10 "
+ ```
+
+ Run a command with input as infinite stream
+
+ ```
+ # create infinite stream
+ iex> input_stream = Stream.repeatedly(fn -> "A" end)
+ iex> binary =
+ ...> Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true) # we need to ignore epipe since we are terminating the program before the input completes
+ ...> |> Stream.take(2) # we must limit since the input stream is infinite
+ ...> |> Enum.into("")
+ iex> is_binary(binary)
+ true
+ iex> "AAAAA" <> _ = binary
+ ```
+
+ Run a command with input Collectable
+
+ ```
+ # Exile calls the callback with a sink where the process can push the data
+ iex> Exile.stream!(~w(cat), input: fn sink ->
+ ...> Stream.map(1..10, fn num -> "#{num} " end)
+ ...> |> Stream.into(sink) # push to the external process
+ ...> |> Stream.run()
+ ...> end)
+ ...> |> Stream.take(100) # we must limit since the input stream is infinite
+ ...> |> Enum.into("")
+ "1 2 3 4 5 6 7 8 9 10 "
+ ```
+
+ When the command wait for the input stream to close
+
+ ```
+ # base64 command wait for the input to close and writes data to stdout at once
+ iex> Exile.stream!(~w(base64), input: ["abcdef"])
+ ...> |> Enum.into("")
+ "YWJjZGVm\n"
+ ```
+
+ When the command exit with an error
+
+ ```
+ iex> Exile.stream!(["sh", "-c", "exit 4"])
+ ...> |> Enum.into("")
+ ** (Exile.Process.Error) command exited with status: 4
+ ```
+
+ With `max_chunk_size` set
+
+ ```
+ iex> data =
+ ...> Exile.stream!(~w(cat /dev/urandom), max_chunk_size: 100, ignore_epipe: true)
+ ...> |> Stream.take(5)
+ ...> |> Enum.into("")
+ iex> byte_size(data)
+ 500
+ ```
+
+ When input and output run at different rate
+
+ ```
+ iex> input_stream = Stream.map(1..1000, fn num -> "X #{num} X\n" end)
+ iex> Exile.stream!(~w(grep 250), input: input_stream)
+ ...> |> Enum.into("")
+ "X 250 X\n"
+ ```
+
+ With stderr enabled
+
+ ```
+ iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], enable_stderr: true)
+ ...> |> Enum.to_list()
+ [{:stdout, "foo\n"}, {:stderr, "bar\n"}]
+ ```
+
+ For more details about stream API, see `Exile.stream!/2`.
+
+ For more details about inner working, please check `Exile.Process`
+ documentation.
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
- # we use DynamicSupervisor for cleaning up external processes on
+ # We use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
- @doc """
- Runs the given command with arguments and return an Enumerable to read command output.
+ @doc ~S"""
+ Runs the command with arguments and return an the stdout as lazily
+ Enumerable stream, similar to [`Stream`](https://hexdocs.pm/elixir/Stream.html).
- First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`.
+ First parameter must be a list containing command with arguments.
+ Example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* Enumerable:
```
# List
Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
# Stream
- Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
+ Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65_536)) |> Enum.to_list()
```
* Collectable:
- If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
+ If the input in a function with arity 1, Exile will call that function
+ with a `Collectable` as the argument. The function must *push* input to this
+ collectable. Return value of the function is ignored.
```
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
- By defaults no input will be given to the command
+ By defaults no input is sent to the command.
- * `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
+ * `exit_timeout` - Duration to wait for external program to exit after completion
+ (when stream ends). Defaults to `:infinity`
- * `max_chunk_size` - Maximum size of each iodata chunk emitted by stream. Chunk size will be variable depending on the amount of data available at that time. Defaults to 65535
+ * `max_chunk_size` - Maximum size of iodata chunk emitted by the stream.
+ Chunk size can be less than the `max_chunk_size` depending on the amount of
+ data available to be read. Defaults to `65_535`
- * `use_stderr` - When set to true, stream will contain stderr output along with stdout output. Element of the stream will be of the form `{:stdout, iodata}` or `{:stderr, iodata}` to differentiate different streams. Defaults to false. See example below
+ * `enable_stderr` - When set to true, output stream will contain stderr data along
+ with stdout. Stream data will be of the form `{:stdout, iodata}` or `{:stderr, iodata}`
+ to differentiate different streams. Defaults to false. See example below
+ * `ignore_epipe` - When set to true, reader can exit early without raising error.
+ Typically writer gets `EPIPE` error on write when program terminate prematurely.
+ With `ignore_epipe` set to true this error will be ignored. This can be used to
+ match UNIX shell default behaviour. EPIPE is the error raised when the reader finishes
+ the reading and close output pipe before command completes. Defaults to `false`.
- All other options are passed to `Exile.Process.start_link/2`
+ Remaining options are passed to `Exile.Process.start_link/2`
### Examples
```
- Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
+ Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
Stream with stderr
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1),
- input: File.stream!("music_video.mkv", [], 65535),
- use_stderr: true
+ input: File.stream!("music_video.mkv", [], 65_535),
+ enable_stderr: true
)
|> Stream.transform(
fn ->
File.open!("music.mp3", [:write, :binary])
end,
fn elem, file ->
case elem do
{:stdout, data} ->
+ # write stdout data to a file
:ok = IO.binwrite(file, data)
{:stderr, msg} ->
+ # write stderr output to console
:ok = IO.write(msg)
end
{[], file}
end,
fn file ->
:ok = File.close(file)
end
)
|> Stream.run()
```
"""
@type collectable_func() :: (Collectable.t() -> any())
@spec stream!(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
max_chunk_size: pos_integer()
- ) :: ExCmd.Stream.t()
+ ) :: Exile.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index a58aca3..4dfb8e1 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,669 +1,860 @@
defmodule Exile.Process do
- @moduledoc """
+ @moduledoc ~S"""
GenServer which wraps spawned external command.
- `Exile.stream!/1` should be preferred over using this. Use this only if you need more control over the life-cycle of IO streams and OS process.
+ Use `Exile.stream!/1` over using this. Use this only if you are
+ familiar with life-cycle and need more control of the IO streams
+ and OS process.
## Comparison with Port
- * it is demand driven. User explicitly has to `read` the command output, and the progress of the external command is controlled using OS pipes. Exile never load more output than we can consume, so we should never experience memory issues
+ * it is demand driven. User explicitly has to `read` the command
+ output, and the progress of the external command is controlled
+ using OS pipes. Exile never load more output than we can consume,
+ so we should never experience memory issues
+
* it can close stdin while consuming output
- * tries to handle zombie process by attempting to cleanup external process. Note that there is no middleware involved with exile so it is still possible to endup with zombie process.
- * selectively consume stdout and stderr streams
- Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication, instead uses raw stdio and NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Make use of dirty-schedulers for IO
+ * tries to handle zombie process by attempting to cleanup
+ external process. Note that there is no middleware involved
+ with exile so it is still possible to endup with zombie process.
+
+ * selectively consume stdout and stderr
+
+ Internally Exile uses non-blocking asynchronous system calls
+ to interact with the external process. It does not use port's
+ message based communication, instead uses raw stdio and NIF.
+ Uses asynchronous system calls for IO. Most of the system
+ calls are non-blocking, so it should not block the beam
+ schedulers. Make use of dirty-schedulers for IO
+
+ ## Introduction
+
+ `Exile.Process` is a process based wrapper around the external
+ process. It is similar to `port` as an entity but the interface is
+ different. All communication with the external process must happen
+ via `Exile.Process` interface.
+
+ Exile process life-cycle tied to external process and owners. All
+ system resources such are open file-descriptors, external process
+ are cleaned up when the `Exile.Process` dies.
+
+ ### Owner
+
+ Each `Exile.Process` has an owner. And it will be the process which
+ created it (via `Exile.Process.start_link/2`). Process owner can not
+ be changed.
+
+ Owner process will be linked to the `Exile.Process`. So when the
+ exile process is dies abnormally the owner will be killed too or
+ visa-versa. Owner process should avoid trapping the exit signal, if
+ you want avoid the caller getting killed, create a separate process
+ as owner to run the command and monitor that process.
+
+ Only owner can get the exit status of the command, using
+ `Exile.Process.await_exit/2`. All exile processes **MUST** be
+ awaited. Exit status or reason is **ALWAYS** sent to the owner. It
+ is similar to [`Task`](https://hexdocs.pm/elixir/Task.html). If the
+ owner exit without `await_exit`, the exile process will be killed,
+ but if the owner continue without `await_exit` then the exile
+ process will linger around till the process exit.
+
+ ```
+ iex> alias Exile.Process
+ iex> {:ok, p} = Process.start_link(~w(echo hello))
+ iex> Process.read(p, 100)
+ {:ok, "hello\n"}
+ iex> Process.read(p, 100) # read till we get :eof
+ :eof
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ ### Pipe & Pipe Owner
+
+ Standard IO pipes/channels/streams of the external process such as
+ STDIN, STDOUT, STDERR are called as Pipes. User can either write or
+ read data from pipes.
+
+ Each pipe has an owner process and only that process can write or
+ read from the exile process. By default the process who created the
+ exile process is the owner of all the pipes. Pipe owner can be
+ changed using `Exile.Process.change_pipe_owner/3`.
+
+ Pipe owner is monitored and the pipes are closed automatically when
+ the pipe owner exit. Pipe Owner can close the pipe early using
+ `Exile.Process.close_stdin/1` etc.
+
+ `Exile.Process.await_exit/2` closes all of the caller owned pipes by
+ default.
+
+ ```
+ iex> {:ok, p} = Process.start_link(~w(cat))
+ iex> writer = Task.async(fn ->
+ ...> :ok = Process.change_pipe_owner(p, :stdin, self())
+ ...> Process.write(p, "Hello World")
+ ...> end)
+ iex> Task.await(writer)
+ :ok
+ iex> Process.read(p, 100)
+ {:ok, "Hello World"}
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ ### Pipe Operations
+
+ Pipe owner can read or write date to the owned pipe. `:stderr` by
+ default is disabled, data written to stderr will appear on the
+ console. You can enable reading stderr by passing `enable_stderr:
+ true` during process creation.
+
+ Special function `Exile.Process.read_any/2` can be used to read
+ from either stdout or stderr whichever has the data available.
+
+ All Pipe operations blocks the caller to have blocking as natural
+ back-pressure and to make the API simple. This is an important
+ feature of Exile, that is the ability to block caller when the stdio
+ buffer is full, exactly similar to how programs works on the shell
+ with pipes between then `cat larg-file | grep "foo"`. Internally it
+ does not block the Exile process or VM (which is typically the case
+ with NIF calls). Because of this user can make concurrent read,
+ write to different pipes from separate processes. Internally Exile
+ uses asynchronous IO APIs to avoid blocking of VM or VM process.
+
+ Reading from stderr
+
+ ```
+ # write "Hello" to stdout and "World" to stderr
+ iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n")
+ iex> {:ok, p} = Process.start_link(["sh", "-c", script], enable_stderr: true)
+ iex> Process.read(p, 100)
+ {:ok, "Hello\n"}
+ iex> Process.read_stderr(p, 100)
+ {:ok, "World\n"}
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ Reading using `read_any`
+
+ ```
+ # write "Hello" to stdout and "World" to stderr
+ iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n")
+ iex> {:ok, p} = Process.start_link(["sh", "-c", script], enable_stderr: true)
+ iex> Process.read_any(p)
+ {:ok, {:stdout, "Hello\n"}}
+ iex> Process.read_any(p)
+ {:ok, {:stderr, "World\n"}}
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ ### Process Termination
+
+ When owner does (normally or abnormally) the Exile process always
+ terminated irrespective of pipe status or process status. External
+ process get a chance to terminate gracefully, if that fail it will
+ be killed.
+
+ If owner calls `await_exit` then the owner owned pipes are closed
+ and we wait for external process to terminate, if the process
+ already terminated then call returns immediately with exit
+ status. Else command will be attempted to stop gracefully following
+ the exit sequence based on the timeout value (5s by default).
+
+ If owner calls `await_exit` with `timeout` as `:infinity` then
+ Exile does not attempt to forcefully stop the external command and
+ wait for command to exit on itself. The `await_exit` call can be blocked
+ indefinitely waiting for external process to terminate.
+
+ If external process exit on its own, exit status is collected and
+ Exile process will wait for owner to close pipes. Most commands exit
+ with pipes are closed, so just ensuring to close pipes when works is
+ done should be enough.
+
+ Example of process getting terminated by `SIGTERM` signal
+
+ ```
+ # sleep command does not watch for stdin or stdout, so closing the
+ # pipe does not terminate the sleep command.
+ iex> {:ok, p} = Process.start_link(~w(sleep 100000000)) # sleep indefinitely
+ iex> Process.await_exit(p, 100) # ensure `await_exit` finish within `100ms`. By default it waits for 5s
+ {:ok, 143} # 143 is the exit status when command exit due to SIGTERM
+ ```
+
+ ## Examples
+
+ Run a command without any input or output
+
+ ```
+ iex> {:ok, p} = Process.start_link(["sh", "-c", "exit 1"])
+ iex> Process.await_exit(p)
+ {:ok, 1}
+ ```
+
+ Single process reading and writing to the command
+
+ ```
+ # bc is a calculator, which reads from stdin and writes output to stdout
+ iex> {:ok, p} = Process.start_link(~w(bc))
+ iex> Process.write(p, "1 + 1\n") # there must be new-line to indicate the end of the input line
+ :ok
+ iex> Process.read(p)
+ {:ok, "2\n"}
+ iex> Process.write(p, "2 * 10 + 1\n")
+ :ok
+ iex> Process.read(p)
+ {:ok, "21\n"}
+ # We must close stdin to signal the `bc` command that we are done.
+ # since `await_exit` implicitly closes the pipes, in this case we don't have to
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ Running a command which flush the output on stdin close. This is not
+ supported by Erlang/Elixir ports.
+
+ ```
+ # `base64` command reads all input and writes encoded output when stdin is closed.
+ iex> {:ok, p} = Process.start_link(~w(base64))
+ iex> Process.write(p, "abcdef")
+ :ok
+ iex> Process.close_stdin(p) # we can selectively close stdin and read all output
+ :ok
+ iex> Process.read(p)
+ {:ok, "YWJjZGVm\n"}
+ iex> Process.read(p) # typically it is better to read till we receive :eof when we are not sure how big the output data size is
+ :eof
+ iex> Process.await_exit(p)
+ {:ok, 0}
+ ```
+
+ Read and write to pipes in separate processes
+
+ ```
+ iex> {:ok, p} = Process.start_link(~w(cat))
+ iex> writer = Task.async(fn ->
+ ...> :ok = Process.change_pipe_owner(p, :stdin, self())
+ ...> Process.write(p, "Hello World")
+ ...> # no need to close the pipe explicitly here. Pipe will be closed automatically when process exit
+ ...> end)
+ iex> reader = Task.async(fn ->
+ ...> :ok = Process.change_pipe_owner(p, :stdout, self())
+ ...> Process.read(p)
+ ...> end)
+ iex> :timer.sleep(500) # wait for the reader and writer to change pipe owner, otherwise `await_exit` will close the pipes before we change pipe owner
+ iex> Process.await_exit(p, :infinity) # let the reader and writer take indefinite time to finish
+ {:ok, 0}
+ iex> Task.await(writer)
+ :ok
+ iex> Task.await(reader)
+ {:ok, "Hello World"}
+ ```
+
"""
use GenServer
- alias __MODULE__
- alias Exile.ProcessNif, as: Nif
- require Logger
+ alias Exile.Process.Exec
+ alias Exile.Process.Nif
+ alias Exile.Process.Operations
+ alias Exile.Process.Pipe
+ alias Exile.Process.State
- defstruct [
- :args,
- :errno,
- :port,
- :socket_path,
- :stdin,
- :stdout,
- :stderr,
- :status,
- :use_stderr,
- :await,
- :read_stdout,
- :read_stderr,
- :read_any,
- :write_stdin
- ]
-
- defmodule Pending do
- @moduledoc false
- defstruct bin: [], size: 0, client_pid: nil
- end
+ require Logger
defmodule Error do
defexception [:message]
end
- @default_opts [env: [], use_stderr: false]
- @default_buffer_size 65535
+ @type pipe_name :: :stdin | :stdout | :stderr
+
+ @type t :: %__MODULE__{
+ monitor_ref: reference(),
+ exit_ref: reference(),
+ pid: pid | nil,
+ owner: pid
+ }
+
+ defstruct [:monitor_ref, :exit_ref, :pid, :owner]
+
+ @type exit_status :: non_neg_integer
+
+ @default_opts [env: [], enable_stderr: false]
+ @default_buffer_size 65_535
+ @os_signal_timeout 1000
@doc """
- Starts `Exile.ProcessServer`
+ Starts `Exile.Process` server.
Starts external program using `cmd_with_args` with options `opts`
- `cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.
+ `cmd_with_args` must be a list containing command with arguments.
+ example: `["cat", "file.txt"]`.
### Options
+
* `cd` - the directory to run the command in
- * `env` - a list of tuples containing environment key-value. These can be accessed in the external program
- * `use_stderr` - when set to true, exile connects stderr stream for the consumption. Defaults to false. Note that when set to true stderr must be consumed to avoid external program from blocking
+
+ * `env` - a list of tuples containing environment key-value.
+ These can be accessed in the external program
+
+ * `enable_stderr` - when set to true, Exile connects stderr
+ pipe for the consumption. Defaults to false. Note that when set
+ to true stderr must be consumed to avoid external program from blocking.
+
+ Caller of the process will be the owner owner of the Exile Process.
+ And default owner of all opened pipes.
+
+ Please check module documentation for more details
"""
- @type process :: pid
@spec start_link(nonempty_list(String.t()),
cd: String.t(),
env: [{String.t(), String.t()}],
- use_stderr: boolean()
- ) :: {:ok, process} | {:error, any()}
+ enable_stderr: boolean()
+ ) :: {:ok, t} | {:error, any()}
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
- with {:ok, args} <- normalize_args(cmd_with_args, opts) do
- GenServer.start(__MODULE__, args)
+ case Exec.normalize_exec_args(cmd_with_args, opts) do
+ {:ok, args} ->
+ owner = self()
+ exit_ref = make_ref()
+ args = Map.merge(args, %{owner: owner, exit_ref: exit_ref})
+ {:ok, pid} = GenServer.start_link(__MODULE__, args)
+ ref = Process.monitor(pid)
+
+ process = %__MODULE__{
+ pid: pid,
+ monitor_ref: ref,
+ exit_ref: exit_ref,
+ owner: owner
+ }
+
+ {:ok, process}
+
+ error ->
+ error
end
end
@doc """
- Closes external program's input stream
+ Closes external program's standard input pipe (stdin).
+
+ Only owner of the pipe can close the pipe. This call will return
+ immediately.
"""
- @spec close_stdin(process) :: :ok | {:error, any()}
+ @spec close_stdin(t) :: :ok | {:error, :pipe_closed_or_invalid_caller} | {:error, any()}
def close_stdin(process) do
- GenServer.call(process, :close_stdin, :infinity)
+ GenServer.call(process.pid, {:close_pipe, :stdin}, :infinity)
end
@doc """
- Writes iodata `data` to program's input streams
+ Closes external program's standard output pipe (stdout)
- This blocks when the pipe is full
+ Only owner of the pipe can close the pipe. This call will return
+ immediately.
"""
- @spec write(process, binary) :: :ok | {:error, any()}
+ @spec close_stdout(t) :: :ok | {:error, any()}
+ def close_stdout(process) do
+ GenServer.call(process.pid, {:close_pipe, :stdout}, :infinity)
+ end
+
+ @doc """
+ Closes external program's standard error pipe (stderr)
+
+ Only owner of the pipe can close the pipe. This call will return
+ immediately.
+ """
+ @spec close_stderr(t) :: :ok | {:error, any()}
+ def close_stderr(process) do
+ GenServer.call(process.pid, {:close_pipe, :stderr}, :infinity)
+ end
+
+ @doc """
+ Writes iodata `data` to external program's standard input pipe.
+
+ This call blocks when the pipe is full. Returns `:ok` when
+ the complete data is written.
+ """
+ @spec write(t, binary) :: :ok | {:error, any()}
def write(process, iodata) do
- GenServer.call(process, {:write_stdin, IO.iodata_to_binary(iodata)}, :infinity)
+ binary = IO.iodata_to_binary(iodata)
+ GenServer.call(process.pid, {:write_stdin, binary}, :infinity)
end
@doc """
- Returns bytes from executed command's stdout stream with maximum size `max_size`.
+ Returns bytes from executed command's stdout with maximum size `max_size`.
+
+ Blocks if no data present in stdout pipe yet. And returns as soon as
+ data of any size is available.
- Blocks if no bytes are written to stdout stream yet. And returns as soon as bytes are available
+ Note that `max_size` is the maximum size of the returned data. But
+ the returned data can be less than that depending on how the program
+ flush the data etc.
"""
- @spec read(process, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
+ @spec read(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
def read(process, max_size \\ @default_buffer_size)
when is_integer(max_size) and max_size > 0 do
- GenServer.call(process, {:read_stdout, max_size}, :infinity)
+ GenServer.call(process.pid, {:read_stdout, max_size}, :infinity)
end
@doc """
- Returns bytes from executed command's stderr stream with maximum size `max_size`.
+ Returns bytes from executed command's stderr with maximum size `max_size`.
+ Pipe must be enabled with `enable_stderr: true` to read the data.
- Blocks if no bytes are written to stdout stream yet. And returns as soon as bytes are available
+ Blocks if no bytes are written to stderr yet. And returns as soon as
+ bytes are available
+
+ Note that `max_size` is the maximum size of the returned data. But
+ the returned data can be less than that depending on how the program
+ flush the data etc.
"""
- @spec read_stderr(process, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
+ @spec read_stderr(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
def read_stderr(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
- GenServer.call(process, {:read_stderr, size}, :infinity)
+ GenServer.call(process.pid, {:read_stderr, size}, :infinity)
end
@doc """
- Returns bytes from either stdout or stderr stream with maximum size `max_size` whichever is available.
+ Returns bytes from either stdout or stderr with maximum size
+ `max_size` whichever is available at that time.
+
+ Blocks if no bytes are written to stdout or stderr yet. And returns
+ as soon as data is available.
- Blocks if no bytes are written to stdout/stderr stream yet. And returns as soon as bytes are available
+ Note that `max_size` is the maximum size of the returned data. But
+ the returned data can be less than that depending on how the program
+ flush the data etc.
"""
- @spec read_any(process, pos_integer()) ::
+ @spec read_any(t, pos_integer()) ::
{:ok, {:stdout, iodata}} | {:ok, {:stderr, iodata}} | :eof | {:error, any()}
def read_any(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
- GenServer.call(process, {:read_any, size}, :infinity)
+ GenServer.call(process.pid, {:read_stdout_or_stderr, size}, :infinity)
end
@doc """
- Sends signal to external program
+ Changes the Pipe owner of the pipe to specified pid.
+
+ Note that currently any process can change the pipe owner.
+
+ For more details about Pipe Owner, please check module docs.
"""
- @spec kill(process, :sigkill | :sigterm) :: :ok
- def kill(process, signal) when signal in [:sigkill, :sigterm] do
- GenServer.call(process, {:kill, signal}, :infinity)
+ @spec change_pipe_owner(t, pipe_name, pid) :: :ok | {:error, any()}
+ def change_pipe_owner(process, pipe_name, target_owner_pid) do
+ GenServer.call(
+ process.pid,
+ {:change_pipe_owner, pipe_name, target_owner_pid},
+ :infinity
+ )
end
@doc """
- Waits for the program to terminate.
+ Sends an system signal to external program
+
+ Note that `:sigkill` kills the program unconditionally.
- If the program terminates before timeout, it returns `{:ok, exit_status}` else returns `:timeout`
+ Avoid sending signals manually, use `await_exit` instead.
"""
- @spec await_exit(process, timeout: timeout()) :: {:ok, integer()} | :timeout
- def await_exit(process, timeout \\ :infinity) do
- GenServer.call(process, {:await_exit, timeout}, :infinity)
+ @spec kill(t, :sigkill | :sigterm) :: :ok
+ def kill(process, signal) when signal in [:sigkill, :sigterm] do
+ GenServer.call(process.pid, {:kill, signal}, :infinity)
end
@doc """
- Returns OS pid of the command
+ Wait for the program to terminate and get exit status.
+
+ **ONLY** the Process owner can call this function. And all Exile
+ **process MUST** be awaited (Similar to Task).
+
+ Exile first politely asks the program to terminate by closing the
+ pipes owned by the process owner (by default process owner is the
+ pipes owner). Most programs terminates when standard pipes are
+ closed.
+
+ If you have changed the pipe owner to other process, you have to
+ close pipe yourself or wait for the program to exit.
+
+ If the program fails to terminate within the timeout (default 5s)
+ then the program will be killed using the exit sequence by sending
+ `SIGTERM`, `SIGKILL` signals in sequence.
+
+ When timeout is set to `:infinity` `await_exit` wait for the
+ programs to terminate indefinitely.
+
+ For more details check module documentation.
"""
- @spec os_pid(process) :: pos_integer()
- def os_pid(process) do
- GenServer.call(process, :os_pid, :infinity)
+ @spec await_exit(t, timeout :: timeout()) :: {:ok, exit_status}
+ def await_exit(process, timeout \\ 5000) do
+ %__MODULE__{
+ monitor_ref: monitor_ref,
+ exit_ref: exit_ref,
+ owner: owner,
+ pid: pid
+ } = process
+
+ if self() != owner do
+ raise ArgumentError,
+ "task #{inspect(process)} exit status can only be queried by owner but was queried from #{inspect(self())}"
+ end
+
+ graceful_exit_timeout =
+ if timeout == :infinity do
+ :infinity
+ else
+ # process exit steps should finish before receive timeout exceeds
+ # receive timeout is max allowed time for the `await_exit` call to block
+ max(0, timeout - 100)
+ end
+
+ :ok = GenServer.cast(pid, {:prepare_exit, owner, graceful_exit_timeout})
+
+ receive do
+ {^exit_ref, exit_status} ->
+ exit_status
+
+ {:DOWN, ^monitor_ref, _, _proc, reason} ->
+ exit({reason, {__MODULE__, :await_exit, [process, timeout]}})
+ after
+ # ideally we should never this this case since the process must
+ # be terminated before the timeout and we should have received
+ # `DOWN` message
+ timeout ->
+ exit({:timeout, {__MODULE__, :await_exit, [process, timeout]}})
+ end
end
@doc """
- Stops the exile process, external program will be terminated in the background
+ Returns OS pid of the command
+
+ This is meant only for debugging. Avoid interacting with the
+ external process directly
"""
- @spec stop(process) :: :ok
- def stop(process), do: GenServer.call(process, :stop, :infinity)
+ @spec os_pid(t) :: pos_integer()
+ def os_pid(process) do
+ GenServer.call(process.pid, :os_pid, :infinity)
+ end
## Server
+ @impl true
def init(args) do
- {use_stderr, args} = Map.pop(args, :use_stderr)
+ {enable_stderr, args} = Map.pop(args, :enable_stderr)
+ {owner, args} = Map.pop!(args, :owner)
+ {exit_ref, args} = Map.pop!(args, :exit_ref)
- state = %__MODULE__{
+ state = %State{
args: args,
- errno: nil,
+ owner: owner,
status: :init,
- await: %{},
- use_stderr: use_stderr,
- read_stdout: %Pending{},
- read_stderr: %Pending{},
- read_any: %Pending{},
- write_stdin: %Pending{}
+ enable_stderr: enable_stderr,
+ operations: Operations.new(),
+ exit_ref: exit_ref,
+ monitor_ref: Process.monitor(owner)
}
{:ok, state, {:continue, nil}}
end
+ @impl true
def handle_continue(nil, state) do
- Elixir.Process.flag(:trap_exit, true)
- {:noreply, start_process(state)}
+ {:noreply, exec(state)}
end
- def handle_call(:stop, _from, state) do
- # TODO: pending write and read should receive "stopped" return
- # value instead of exit signal
- {:stop, :normal, :ok, state}
- end
+ @impl true
+ def handle_cast({:prepare_exit, caller, timeout}, state) do
+ state =
+ Enum.reduce(state.pipes, state, fn {_pipe_name, pipe}, state ->
+ case Pipe.close(pipe, caller) do
+ {:ok, pipe} ->
+ {:ok, state} = State.put_pipe(state, pipe.name, pipe)
+ state
+
+ {:error, _} ->
+ state
+ end
+ end)
- def handle_call(:close_stdin, _from, state) do
- case state.status do
- {:exit, _} -> {:reply, :ok, state}
- _ -> do_close(state, :stdin)
+ case maybe_shutdown(state) do
+ {:stop, :normal, state} ->
+ {:stop, :normal, state}
+
+ {:noreply, state} ->
+ if timeout == :infinity do
+ {:noreply, state}
+ else
+ total_stages = 3
+ stage_timeout = div(timeout, total_stages)
+ handle_info({:prepare_exit, :normal_exit, stage_timeout}, state)
+ end
end
end
- def handle_call({:await_exit, _}, _from, %{status: {:exit, status}} = state) do
- {:reply, {:ok, {:exit, status}}, state}
+ @impl true
+ def handle_call({:change_pipe_owner, pipe_name, new_owner}, _from, state) do
+ with {:ok, pipe} <- State.pipe(state, pipe_name),
+ {:ok, new_pipe} <- Pipe.set_owner(pipe, new_owner),
+ {:ok, state} <- State.put_pipe(state, pipe_name, new_pipe) do
+ {:reply, :ok, state}
+ else
+ {:error, _} = error ->
+ {:reply, error, state}
+ end
end
- def handle_call({:await_exit, timeout}, from, %{status: :start} = state) do
- tref =
- if timeout != :infinity do
- Elixir.Process.send_after(self(), {:await_exit_timeout, from}, timeout)
- else
- nil
- end
-
- {:noreply, %Process{state | await: Map.put(state.await, from, tref)}}
+ def handle_call({:close_pipe, pipe_name}, {caller, _} = from, state) do
+ with {:ok, pipe} <- State.pipe(state, pipe_name),
+ {:ok, new_pipe} <- Pipe.close(pipe, caller),
+ :ok <- GenServer.reply(from, :ok),
+ {:ok, new_state} <- State.put_pipe(state, pipe_name, new_pipe) do
+ maybe_shutdown(new_state)
+ else
+ {:error, _} = ret ->
+ {:reply, ret, state}
+ end
end
def handle_call({:read_stdout, size}, from, state) do
- case can_read?(state, :stdout) do
- :ok ->
- pending = %Pending{size: size, client_pid: from}
- do_read_stdout(%Process{state | read_stdout: pending})
-
- error ->
- GenServer.reply(from, error)
+ case Operations.read(state, {:read_stdout, from, size}) do
+ {:noreply, state} ->
{:noreply, state}
+
+ ret ->
+ {:reply, ret, state}
end
end
def handle_call({:read_stderr, size}, from, state) do
- case can_read?(state, :stderr) do
- :ok ->
- pending = %Pending{size: size, client_pid: from}
- do_read_stderr(%Process{state | read_stderr: pending})
-
- error ->
- GenServer.reply(from, error)
+ case Operations.read(state, {:read_stderr, from, size}) do
+ {:noreply, state} ->
{:noreply, state}
+
+ ret ->
+ {:reply, ret, state}
end
end
- def handle_call({:read_any, size}, from, state) do
- case can_read?(state, :any) do
- :ok ->
- pending = %Pending{size: size, client_pid: from}
- do_read_any(%Process{state | read_any: pending})
-
- error ->
- GenServer.reply(from, error)
+ def handle_call({:read_stdout_or_stderr, size}, from, state) do
+ case Operations.read_any(state, {:read_stdout_or_stderr, from, size}) do
+ {:noreply, state} ->
{:noreply, state}
- end
- end
- def handle_call(_, _from, %{status: {:exit, status}} = state) do
- {:reply, {:error, {:exit, status}}, state}
+ ret ->
+ {:reply, ret, state}
+ end
end
def handle_call({:write_stdin, binary}, from, state) do
- cond do
- !is_binary(binary) ->
- {:reply, {:error, :not_binary}, state}
-
- state.write_stdin.client_pid ->
- {:reply, {:error, :write_stdin}, state}
+ case Operations.write(state, {:write_stdin, from, binary}) do
+ {:noreply, state} ->
+ {:noreply, state}
- true ->
- pending = %Pending{bin: binary, client_pid: from}
- do_write(%Process{state | write_stdin: pending})
+ ret ->
+ {:reply, ret, state}
end
end
def handle_call(:os_pid, _from, state) do
case Port.info(state.port, :os_pid) do
{:os_pid, os_pid} ->
{:reply, {:ok, os_pid}, state}
- :undefined ->
+ nil ->
Logger.debug("Process not alive")
{:reply, :undefined, state}
end
end
def handle_call({:kill, signal}, _from, state) do
{:reply, signal(state.port, signal), state}
end
- def handle_info({:await_exit_timeout, from}, state) do
- GenServer.reply(from, :timeout)
- {:noreply, %Process{state | await: Map.delete(state.await, from)}}
- end
-
- def handle_info({:select, _write_resource, _ref, :ready_output}, state), do: do_write(state)
-
- def handle_info({:select, read_resource, _ref, :ready_input}, state) do
+ @impl true
+ def handle_info({:prepare_exit, current_stage, timeout}, %{status: status, port: port} = state) do
cond do
- state.read_any.client_pid ->
- stream =
- cond do
- read_resource == state.stdout -> :stdout
- read_resource == state.stderr -> :stderr
- end
-
- do_read_any(state, stream)
-
- state.read_stdout.client_pid && read_resource == state.stdout ->
- do_read_stdout(state)
-
- state.read_stderr.client_pid && read_resource == state.stderr ->
- do_read_stderr(state)
-
- true ->
+ status != :running ->
{:noreply, state}
- end
- end
-
- def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state),
- do: handle_port_exit(exit_status, state)
-
- def handle_info({:EXIT, port, :normal}, %{port: port} = state), do: {:noreply, state}
-
- def handle_info({:EXIT, _, reason}, state), do: {:stop, reason, state}
-
- defp handle_port_exit(exit_status, state) do
- Enum.each(state.await, fn {from, tref} ->
- GenServer.reply(from, {:ok, {:exit, exit_status}})
-
- if tref do
- Elixir.Process.cancel_timer(tref)
- end
- end)
- {:noreply, %Process{state | status: {:exit, exit_status}, await: %{}}}
- end
-
- defmacrop eof, do: {:ok, <<>>}
- defmacrop eagain, do: {:error, :eagain}
-
- defp do_write(%Process{write_stdin: %Pending{bin: <<>>}} = state) do
- reply_action(state, :write_stdin, :ok)
- end
-
- defp do_write(%Process{write_stdin: pending} = state) do
- bin_size = byte_size(pending.bin)
-
- case Nif.nif_write(state.stdin, pending.bin) do
- {:ok, size} when size < bin_size ->
- binary = binary_part(pending.bin, size, bin_size - size)
- noreply_action(%{state | write_stdin: %Pending{pending | bin: binary}})
-
- {:ok, _size} ->
- reply_action(state, :write_stdin, :ok)
-
- eagain() ->
- noreply_action(state)
-
- {:error, errno} ->
- reply_action(%Process{state | errno: errno}, :write_stdin, {:error, errno})
- end
- end
-
- defp do_read_stdout(%Process{read_stdout: pending} = state) do
- case Nif.nif_read(state.stdout, pending.size) do
- eof() ->
- reply_action(state, :read_stdout, :eof)
+ current_stage == :normal_exit ->
+ Elixir.Process.send_after(self(), {:prepare_exit, :sigterm, timeout}, timeout)
+ {:noreply, state}
- {:ok, binary} ->
- reply_action(state, :read_stdout, {:ok, binary})
+ current_stage == :sigterm ->
+ signal(port, :sigterm)
+ Elixir.Process.send_after(self(), {:prepare_exit, :sigkill, timeout}, timeout)
+ {:noreply, state}
- eagain() ->
- noreply_action(state)
+ current_stage == :sigkill ->
+ signal(port, :sigkill)
+ Elixir.Process.send_after(self(), {:prepare_exit, :stop, timeout}, timeout)
+ {:noreply, state}
- {:error, errno} ->
- reply_action(%Process{state | errno: errno}, :read_stdout, {:error, errno})
+ # this should never happen, since sigkill signal can not be ignored by the OS process
+ current_stage == :stop ->
+ {:stop, :sigkill_timeout, state}
end
end
- defp do_read_stderr(%Process{read_stderr: pending} = state) do
- case Nif.nif_read(state.stderr, pending.size) do
- eof() ->
- reply_action(state, :read_stderr, :eof)
-
- {:ok, binary} ->
- reply_action(state, :read_stderr, {:ok, binary})
-
- eagain() ->
- noreply_action(state)
-
- {:error, errno} ->
- reply_action(%Process{state | errno: errno}, :read_stderr, {:error, errno})
- end
- end
+ def handle_info({:select, write_resource, _ref, :ready_output}, state) do
+ :stdin = State.pipe_name_for_fd(state, write_resource)
- defp do_read_any(state, stream_hint \\ :stdout) do
- %Process{read_any: pending, use_stderr: use_stderr} = state
+ with {:ok, {:write_stdin, from, _bin} = operation, state} <-
+ State.pop_operation(state, :write_stdin) do
+ case Operations.write(state, operation) do
+ {:noreply, state} ->
+ {:noreply, state}
- other_stream =
- case stream_hint do
- :stdout -> :stderr
- :stderr -> :stdout
+ ret ->
+ GenServer.reply(from, ret)
+ {:noreply, state}
end
-
- case Nif.nif_read(stream_fd(state, stream_hint), pending.size) do
- ret when ret in [eof(), eagain()] and use_stderr == true ->
- case {ret, Nif.nif_read(stream_fd(state, other_stream), pending.size)} do
- {eof(), eof()} ->
- reply_action(state, :read_any, :eof)
-
- {_, {:ok, binary}} ->
- reply_action(state, :read_any, {:ok, {other_stream, binary}})
-
- {_, eagain()} ->
- noreply_action(state)
-
- {_, {:error, errno}} ->
- reply_action(%Process{state | errno: errno}, :read_any, {:error, errno})
- end
-
- eof() ->
- reply_action(state, :read_any, :eof)
-
- {:ok, binary} ->
- reply_action(state, :read_any, {:ok, {stream_hint, binary}})
-
- eagain() ->
- noreply_action(state)
-
- {:error, errno} ->
- reply_action(%Process{state | errno: errno}, :read_any, {:error, errno})
end
end
- defp do_close(state, stream) do
- ret = Nif.nif_close(stream_fd(state, stream))
- {:reply, ret, state}
- end
-
- defp stream_fd(state, stream) do
- case stream do
- :stdin -> state.stdin
- :stdout -> state.stdout
- :stderr -> state.stderr
- end
- end
-
- defp can_read?(state, :stdout) do
- cond do
- state.read_stdout.client_pid ->
- {:error, :pending_stdout_read}
+ def handle_info({:select, read_resource, _ref, :ready_input}, state) do
+ pipe_name = State.pipe_name_for_fd(state, read_resource)
- true ->
- :ok
- end
- end
+ with {:ok, operation_name} <- Operations.match_pending_operation(state, pipe_name),
+ {:ok, {_, from, _} = operation, state} <- State.pop_operation(state, operation_name) do
+ ret =
+ case operation_name do
+ :read_stdout_or_stderr ->
+ Operations.read_any(state, operation)
- defp can_read?(state, :stderr) do
- cond do
- !state.use_stderr ->
- {:error, :cannot_read_stderr}
-
- state.read_stderr.client_pid ->
- {:error, :pending_stderr_read}
+ name when name in [:read_stdout, :read_stderr] ->
+ Operations.read(state, operation)
+ end
- true ->
- :ok
- end
- end
+ case ret do
+ {:noreply, state} ->
+ {:noreply, state}
- defp can_read?(state, :any) do
- with :ok <- can_read?(state, :stdout) do
- if state.use_stderr do
- can_read?(state, :stderr)
- else
- :ok
+ ret ->
+ GenServer.reply(from, ret)
+ {:noreply, state}
end
+ else
+ {:error, _error} ->
+ {:noreply, state}
end
end
- defp signal(port, sig) when sig in [:sigkill, :sigterm] do
- case Port.info(port, :os_pid) do
- {:os_pid, os_pid} -> Nif.nif_kill(os_pid, sig)
- :undefined -> {:error, :process_not_alive}
- end
- end
-
- @spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
-
- defp start_process(state) do
- %{args: %{cmd_with_args: cmd_with_args, cd: cd, env: env}, use_stderr: use_stderr} = state
-
- socket_path = socket_path()
- {:ok, sock} = :socket.open(:local, :stream, :default)
-
- try do
- :ok = socket_bind(sock, socket_path)
- :ok = :socket.listen(sock)
-
- spawner_cmdline_args = [socket_path, to_string(use_stderr) | cmd_with_args]
-
- port_opts =
- [:nouse_stdio, :exit_status, :binary, args: spawner_cmdline_args] ++
- prune_nils(env: env, cd: cd)
-
- port = Port.open({:spawn_executable, @spawner_path}, port_opts)
-
- {:os_pid, os_pid} = Port.info(port, :os_pid)
- Exile.Watcher.watch(self(), os_pid, socket_path)
-
- {stdin, stdout, stderr} = receive_fds(sock, state.use_stderr)
-
- %Process{
- state
- | port: port,
- status: :start,
- socket_path: socket_path,
- stdin: stdin,
- stdout: stdout,
- stderr: stderr
- }
- after
- :socket.close(sock)
+ def handle_info({:stop, :sigterm}, state) do
+ if state.status == :running do
+ signal(state.port, :sigkill)
+ Elixir.Process.send_after(self(), {:stop, :sigkill}, @os_signal_timeout)
end
- end
-
- @socket_timeout 2000
-
- defp receive_fds(lsock, use_stderr) do
- {:ok, sock} = :socket.accept(lsock, @socket_timeout)
-
- try do
- {:ok, msg} = :socket.recvmsg(sock, @socket_timeout)
- %{ctrl: [%{data: data, level: :socket, type: :rights}]} = msg
- <<stdin_fd::native-32, stdout_fd::native-32, stderr_fd::native-32, _::binary>> = data
-
- {:ok, stdout} = Nif.nif_create_fd(stdout_fd)
- {:ok, stdin} = Nif.nif_create_fd(stdin_fd)
-
- {:ok, stderr} =
- if use_stderr do
- Nif.nif_create_fd(stderr_fd)
- else
- {:ok, nil}
- end
-
- {stdin, stdout, stderr}
- after
- :socket.close(sock)
- end
+ {:noreply, state}
end
- defp socket_bind(sock, path) do
- case :socket.bind(sock, %{family: :local, path: path}) do
- :ok -> :ok
- # for OTP version <= 24 compatibility
- {:ok, _} -> :ok
- other -> other
+ def handle_info({:stop, :sigkill}, state) do
+ if state.status == :running do
+ # this should never happen, since sigkill signal can not be handled
+ {:stop, :sigkill_timeout, state}
+ else
+ {:noreply, state}
end
end
- defp socket_path do
- str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
- path = Path.join(System.tmp_dir!(), str)
- _ = :file.delete(path)
- path
+ def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state) do
+ send(state.owner, {state.exit_ref, {:ok, exit_status}})
+ state = State.set_status(state, {:exit, exit_status})
+ maybe_shutdown(state)
end
- defp prune_nils(kv) do
- Enum.reject(kv, fn {_, v} -> is_nil(v) end)
+ # we are only interested in Port exit signals
+ def handle_info({:EXIT, port, reason}, %State{port: port} = state) when reason != :normal do
+ send(state.owner, {state.exit_ref, {:error, reason}})
+ state = State.set_status(state, {:exit, {:error, reason}})
+ maybe_shutdown(state)
end
- defp reply_action(state, action, return_value) do
- pending = Map.fetch!(state, action)
-
- :ok = GenServer.reply(pending.client_pid, return_value)
- {:noreply, Map.put(state, action, %Pending{})}
+ def handle_info({:EXIT, port, :normal}, %{port: port} = state) do
+ maybe_shutdown(state)
end
- defp noreply_action(state) do
- {:noreply, state}
+ # shutdown unconditionally when process owner exit normally.
+ # Since Exile process is linked to the owner, in case of owner crash,
+ # exile process will be killed by the VM.
+ def handle_info(
+ {:DOWN, owner_ref, :process, _pid, reason},
+ %State{monitor_ref: owner_ref} = state
+ ) do
+ {:stop, reason, state}
end
- defp normalize_cmd(arg) do
- case arg do
- [cmd | _] when is_binary(cmd) ->
- path = System.find_executable(cmd)
+ def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
+ state =
+ Enum.reduce(state.pipes, state, fn {_pipe_name, pipe}, state ->
+ case Pipe.close(pipe, pid) do
+ {:ok, pipe} ->
+ {:ok, state} = State.put_pipe(state, pipe.name, pipe)
+ state
- if path do
- {:ok, to_charlist(path)}
- else
- {:error, "command not found: #{inspect(cmd)}"}
+ {:error, _} ->
+ state
end
+ end)
- _ ->
- {:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
- end
+ maybe_shutdown(state)
end
- defp normalize_cmd_args([_ | args]) do
- if is_list(args) && Enum.all?(args, &is_binary/1) do
- {:ok, Enum.map(args, &to_charlist/1)}
+ @type signal :: :sigkill | :sigterm
+
+ @spec signal(port, signal) :: :ok | {:error, :invalid_signal} | {:error, :process_not_alive}
+ defp signal(port, signal) do
+ with true <- signal in [:sigkill, :sigterm],
+ {:os_pid, os_pid} <- Port.info(port, :os_pid) do
+ Nif.nif_kill(os_pid, signal)
else
- {:error, "command arguments must be list of strings. #{inspect(args)}"}
- end
- end
+ false ->
+ {:error, :invalid_signal}
- defp normalize_cd(cd) do
- case cd do
nil ->
- {:ok, ''}
-
- cd when is_binary(cd) ->
- if File.exists?(cd) && File.dir?(cd) do
- {:ok, to_charlist(cd)}
- else
- {:error, "`:cd` must be valid directory path"}
- end
-
- _ ->
- {:error, "`:cd` must be a binary string"}
+ {:error, :process_not_alive}
end
end
- defp normalize_env(env) do
- case env do
- nil ->
- {:ok, []}
-
- env when is_list(env) or is_map(env) ->
- env =
- Enum.map(env, fn {key, value} ->
- {to_charlist(key), to_charlist(value)}
- end)
-
- {:ok, env}
-
- _ ->
- {:error, "`:env` must be a map or list of `{string, string}`"}
- end
- end
+ @spec maybe_shutdown(State.t()) :: {:stop, :normal, State.t()} | {:noreply, State.t()}
+ defp maybe_shutdown(state) do
+ open_pipes_count =
+ state.pipes
+ |> Map.values()
+ |> Enum.count(&Pipe.open?/1)
- defp normalize_use_stderr(use_stderr) do
- case use_stderr do
- nil ->
- {:ok, false}
-
- use_stderr when is_boolean(use_stderr) ->
- {:ok, use_stderr}
-
- _ ->
- {:error, ":use_stderr must be a boolean"}
+ if open_pipes_count == 0 && !(state.status in [:init, :running]) do
+ {:stop, :normal, state}
+ else
+ {:noreply, state}
end
end
- defp validate_opts_fields(opts) do
- {_, additional_opts} = Keyword.split(opts, [:cd, :env, :use_stderr])
+ @spec exec(State.t()) :: State.t()
+ defp exec(state) do
+ %{
+ port: port,
+ stdin: stdin_fd,
+ stdout: stdout_fd,
+ stderr: stderr_fd
+ } = Exec.start(state.args, state.enable_stderr)
- if Enum.empty?(additional_opts) do
- :ok
- else
- {:error, "invalid opts: #{inspect(additional_opts)}"}
- end
- end
+ stderr =
+ if state.enable_stderr do
+ Pipe.new(:stderr, stderr_fd, state.owner)
+ else
+ Pipe.new(:stderr)
+ end
- defp normalize_args(cmd_with_args, opts) do
- with {:ok, cmd} <- normalize_cmd(cmd_with_args),
- {:ok, args} <- normalize_cmd_args(cmd_with_args),
- :ok <- validate_opts_fields(opts),
- {:ok, cd} <- normalize_cd(opts[:cd]),
- {:ok, use_stderr} <- normalize_use_stderr(opts[:use_stderr]),
- {:ok, env} <- normalize_env(opts[:env]) do
- {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, use_stderr: use_stderr}}
- end
+ %State{
+ state
+ | port: port,
+ status: :running,
+ pipes: %{
+ stdin: Pipe.new(:stdin, stdin_fd, state.owner),
+ stdout: Pipe.new(:stdout, stdout_fd, state.owner),
+ stderr: stderr
+ }
+ }
end
end
diff --git a/lib/exile/process/exec.ex b/lib/exile/process/exec.ex
new file mode 100644
index 0000000..d22334c
--- /dev/null
+++ b/lib/exile/process/exec.ex
@@ -0,0 +1,211 @@
+defmodule Exile.Process.Exec do
+ @moduledoc false
+
+ alias Exile.Process.Nif
+ alias Exile.Process.Pipe
+
+ @type args :: %{
+ cmd_with_args: [String.t()],
+ cd: String.t(),
+ env: [{String.t(), String.t()}]
+ }
+
+ @spawner_path :filename.join(:code.priv_dir(:exile), "spawner")
+
+ @spec start(args, boolean()) :: %{
+ port: port,
+ stdin: non_neg_integer(),
+ stdout: non_neg_integer(),
+ stderr: non_neg_integer()
+ }
+ def start(
+ %{
+ cmd_with_args: cmd_with_args,
+ cd: cd,
+ env: env
+ },
+ enable_stderr
+ ) do
+ socket_path = socket_path()
+ {:ok, sock} = :socket.open(:local, :stream, :default)
+
+ try do
+ :ok = socket_bind(sock, socket_path)
+ :ok = :socket.listen(sock)
+
+ spawner_cmdline_args = [socket_path, to_string(enable_stderr) | cmd_with_args]
+
+ port_opts =
+ [:nouse_stdio, :exit_status, :binary, args: spawner_cmdline_args] ++
+ prune_nils(env: env, cd: cd)
+
+ port = Port.open({:spawn_executable, @spawner_path}, port_opts)
+
+ {:os_pid, os_pid} = Port.info(port, :os_pid)
+ Exile.Watcher.watch(self(), os_pid, socket_path)
+
+ {stdin_fd, stdout_fd, stderr_fd} = receive_fds(sock, enable_stderr)
+
+ %{port: port, stdin: stdin_fd, stdout: stdout_fd, stderr: stderr_fd}
+ after
+ :socket.close(sock)
+ File.rm!(socket_path)
+ end
+ end
+
+ @spec normalize_exec_args(nonempty_list(), keyword()) ::
+ {:ok, %{cmd_with_args: nonempty_list(), cd: charlist, env: env, enable_stderr: boolean}}
+ | {:error, String.t()}
+ def normalize_exec_args(cmd_with_args, opts) do
+ with {:ok, cmd} <- normalize_cmd(cmd_with_args),
+ {:ok, args} <- normalize_cmd_args(cmd_with_args),
+ :ok <- validate_opts_fields(opts),
+ {:ok, cd} <- normalize_cd(opts[:cd]),
+ {:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
+ {:ok, env} <- normalize_env(opts[:env]) do
+ {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, enable_stderr: enable_stderr}}
+ end
+ end
+
+ @socket_timeout 2000
+
+ @spec receive_fds(:socket.socket(), boolean) :: {Pipe.fd(), Pipe.fd(), Pipe.fd()}
+ defp receive_fds(lsock, enable_stderr) do
+ {:ok, sock} = :socket.accept(lsock, @socket_timeout)
+
+ try do
+ {:ok, msg} = :socket.recvmsg(sock, @socket_timeout)
+ %{ctrl: [%{data: data, level: :socket, type: :rights}]} = msg
+
+ <<stdin_fd::native-32, stdout_fd::native-32, stderr_fd::native-32, _::binary>> = data
+
+ # FDs are managed by the NIF resource life-cycle
+ {:ok, stdout} = Nif.nif_create_fd(stdout_fd)
+ {:ok, stdin} = Nif.nif_create_fd(stdin_fd)
+
+ {:ok, stderr} =
+ if enable_stderr do
+ Nif.nif_create_fd(stderr_fd)
+ else
+ {:ok, nil}
+ end
+
+ {stdin, stdout, stderr}
+ after
+ :socket.close(sock)
+ end
+ end
+
+ # skip type warning till we change min OTP version to 24.
+ @dialyzer {:nowarn_function, socket_bind: 2}
+ defp socket_bind(sock, path) do
+ case :socket.bind(sock, %{family: :local, path: path}) do
+ :ok -> :ok
+ # for compatibility with OTP version < 24
+ {:ok, _} -> :ok
+ other -> other
+ end
+ end
+
+ @spec socket_path() :: String.t()
+ defp socket_path do
+ str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
+ path = Path.join(System.tmp_dir!(), str)
+ _ = :file.delete(path)
+ path
+ end
+
+ @spec prune_nils(keyword()) :: keyword()
+ defp prune_nils(kv) do
+ Enum.reject(kv, fn {_, v} -> is_nil(v) end)
+ end
+
+ @spec normalize_cmd(nonempty_list()) :: {:ok, nonempty_list()} | {:error, binary()}
+ defp normalize_cmd(arg) do
+ case arg do
+ [cmd | _] when is_binary(cmd) ->
+ path = System.find_executable(cmd)
+
+ if path do
+ {:ok, to_charlist(path)}
+ else
+ {:error, "command not found: #{inspect(cmd)}"}
+ end
+
+ _ ->
+ {:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
+ end
+ end
+
+ defp normalize_cmd_args([_ | args]) do
+ if Enum.all?(args, &is_binary/1) do
+ {:ok, Enum.map(args, &to_charlist/1)}
+ else
+ {:error, "command arguments must be list of strings. #{inspect(args)}"}
+ end
+ end
+
+ @spec normalize_cd(binary) :: {:ok, charlist()} | {:error, String.t()}
+ defp normalize_cd(cd) do
+ case cd do
+ nil ->
+ {:ok, ''}
+
+ cd when is_binary(cd) ->
+ if File.exists?(cd) && File.dir?(cd) do
+ {:ok, to_charlist(cd)}
+ else
+ {:error, "`:cd` must be valid directory path"}
+ end
+
+ _ ->
+ {:error, "`:cd` must be a binary string"}
+ end
+ end
+
+ @type env :: list({String.t(), String.t()})
+
+ @spec normalize_env(env) :: {:ok, env} | {:error, String.t()}
+ defp normalize_env(env) do
+ case env do
+ nil ->
+ {:ok, []}
+
+ env when is_list(env) or is_map(env) ->
+ env =
+ Enum.map(env, fn {key, value} ->
+ {to_charlist(key), to_charlist(value)}
+ end)
+
+ {:ok, env}
+
+ _ ->
+ {:error, "`:env` must be a map or list of `{string, string}`"}
+ end
+ end
+
+ @spec normalize_enable_stderr(enable_stderr :: boolean) :: {:ok, boolean} | {:error, String.t()}
+ defp normalize_enable_stderr(enable_stderr) do
+ case enable_stderr do
+ nil ->
+ {:ok, false}
+
+ enable_stderr when is_boolean(enable_stderr) ->
+ {:ok, enable_stderr}
+
+ _ ->
+ {:error, ":enable_stderr must be a boolean"}
+ end
+ end
+
+ @spec validate_opts_fields(keyword) :: :ok | {:error, String.t()}
+ defp validate_opts_fields(opts) do
+ {_, additional_opts} = Keyword.split(opts, [:cd, :env, :enable_stderr])
+
+ if Enum.empty?(additional_opts) do
+ :ok
+ else
+ {:error, "invalid opts: #{inspect(additional_opts)}"}
+ end
+ end
+end
diff --git a/lib/exile/process_nif.ex b/lib/exile/process/nif.ex
similarity index 95%
rename from lib/exile/process_nif.ex
rename to lib/exile/process/nif.ex
index 2da0f15..3fede38 100644
--- a/lib/exile/process_nif.ex
+++ b/lib/exile/process/nif.ex
@@ -1,21 +1,21 @@
-defmodule Exile.ProcessNif do
+defmodule Exile.Process.Nif do
@moduledoc false
@on_load :load_nifs
def load_nifs do
nif_path = :filename.join(:code.priv_dir(:exile), "exile")
:erlang.load_nif(nif_path, 0)
end
def nif_is_os_pid_alive(_os_pid), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_kill(_os_pid, _signal), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_read(_fd, _max_size), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_create_fd(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_close(_fd), do: :erlang.nif_error(:nif_library_not_loaded)
def nif_write(_fd, _bin), do: :erlang.nif_error(:nif_library_not_loaded)
end
diff --git a/lib/exile/process/operations.ex b/lib/exile/process/operations.ex
new file mode 100644
index 0000000..7f0391d
--- /dev/null
+++ b/lib/exile/process/operations.ex
@@ -0,0 +1,246 @@
+defmodule Exile.Process.Operations do
+ @moduledoc false
+
+ alias Exile.Process.Pipe
+ alias Exile.Process.State
+
+ @type t :: %__MODULE__{
+ write_stdin: write_operation() | nil,
+ read_stdout: read_operation() | nil,
+ read_stderr: read_operation() | nil,
+ read_stdout_or_stderr: read_any_operation() | nil
+ }
+
+ defstruct [:write_stdin, :read_stdout, :read_stderr, :read_stdout_or_stderr]
+
+ @spec new :: t
+ def new, do: %__MODULE__{}
+
+ @type write_operation ::
+ {:write_stdin, GenServer.from(), binary}
+
+ @type read_operation ::
+ {:read_stdout, GenServer.from(), non_neg_integer()}
+ | {:read_stderr, GenServer.from(), non_neg_integer()}
+
+ @type read_any_operation ::
+ {:read_stdout_or_stderr, GenServer.from(), non_neg_integer()}
+
+ @type operation :: write_operation() | read_operation() | read_any_operation()
+
+ @type name :: :write_stdin | :read_stdout | :read_stderr | :read_stdout_or_stderr
+
+ @spec get(t, name) :: {:ok, operation()} | {:error, term}
+ def get(operations, name) do
+ {:ok, Map.fetch!(operations, name)}
+ end
+
+ @spec pop(t, name) :: {:ok, operation, t} | {:error, term}
+ def pop(operations, name) do
+ case Map.get(operations, name) do
+ nil ->
+ {:error, :operation_not_found}
+
+ operation ->
+ {:ok, operation, Map.put(operations, name, nil)}
+ end
+ end
+
+ @spec put(t, operation()) :: {:ok, t} | {:error, term}
+ def put(operations, operation) do
+ with {:ok, {op_name, _from, _arg} = operation} <- validate_operation(operation) do
+ {:ok, Map.put(operations, op_name, operation)}
+ end
+ end
+
+ @spec read_any(State.t(), read_any_operation()) ::
+ :eof
+ | {:noreply, State.t()}
+ | {:ok, {:stdout | :stderr, binary}}
+ | {:error, term}
+ def read_any(state, {:read_stdout_or_stderr, _from, _size} = operation) do
+ with {:ok, {_name, {caller, _}, arg}} <- validate_read_any_operation(operation),
+ first <- pipe_name(operation),
+ {:ok, primary} <- State.pipe(state, first),
+ second <- if(first == :stdout, do: :stderr, else: :stdout),
+ {:ok, secondary} <- State.pipe(state, second),
+ {:error, :eagain} <- do_read_any(caller, arg, primary, secondary),
+ {:ok, new_state} <- State.put_operation(state, operation) do
+ # dbg(new_state)
+ {:noreply, new_state}
+ end
+ end
+
+ @spec read(State.t(), read_operation()) ::
+ :eof
+ | {:noreply, State.t()}
+ | {:ok, binary}
+ | {:error, term}
+ def read(state, operation) do
+ with {:ok, {_name, {caller, _}, arg}} <- validate_read_operation(operation),
+ {:ok, pipe} <- State.pipe(state, pipe_name(operation)),
+ {:error, :eagain} <- Pipe.read(pipe, arg, caller),
+ {:ok, new_state} <- State.put_operation(state, operation) do
+ {:noreply, new_state}
+ end
+ end
+
+ @spec write(State.t(), write_operation()) ::
+ :ok
+ | {:noreply, State.t()}
+ | {:error, :epipe}
+ | {:error, term}
+ def write(state, operation) do
+ with {:ok, {_name, {caller, _}, bin}} <- validate_write_operation(operation),
+ pipe_name <- pipe_name(operation),
+ {:ok, pipe} <- State.pipe(state, pipe_name) do
+ case Pipe.write(pipe, bin, caller) do
+ {:ok, size} ->
+ handle_successful_write(state, size, operation)
+
+ {:error, :eagain} ->
+ case State.put_operation(state, operation) do
+ {:ok, new_state} ->
+ {:noreply, new_state}
+
+ error ->
+ error
+ end
+
+ ret ->
+ ret
+ end
+ end
+ end
+
+ @spec match_pending_operation(State.t(), Pipe.name()) ::
+ {:ok, name} | {:error, :no_pending_operation}
+ # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
+ def match_pending_operation(state, pipe_name) do
+ cond do
+ state.operations.read_stdout_or_stderr &&
+ pipe_name in [:stdout, :stderr] ->
+ {:ok, :read_stdout_or_stderr}
+
+ state.operations.read_stdout &&
+ pipe_name == :stdout ->
+ {:ok, :read_stdout}
+
+ state.operations.read_stderr &&
+ pipe_name == :stderr ->
+ {:ok, :read_stderr}
+
+ state.operations.write_stdin &&
+ pipe_name == :stdin ->
+ {:ok, :write_stdin}
+
+ true ->
+ {:error, :no_pending_operation}
+ end
+ end
+
+ @spec handle_successful_write(State.t(), non_neg_integer(), write_operation()) ::
+ :ok | {:noreply, State.t()} | {:error, term}
+ defp handle_successful_write(state, written_size, {name, from, bin}) do
+ bin_size = byte_size(bin)
+
+ # check if it is partial write
+ if written_size < bin_size do
+ new_bin = binary_part(bin, written_size, bin_size - written_size)
+
+ case State.put_operation(state, {name, from, new_bin}) do
+ {:ok, new_state} ->
+ {:noreply, new_state}
+
+ error ->
+ error
+ end
+ else
+ :ok
+ end
+ end
+
+ @spec do_read_any(pid, non_neg_integer(), Pipe.t(), Pipe.t()) ::
+ :eof | {:ok, {Pipe.name(), binary}} | {:error, term}
+ defp do_read_any(caller, size, primary, secondary) do
+ case Pipe.read(primary, size, caller) do
+ ret1 when ret1 in [:eof, {:error, :eagain}, {:error, :pipe_closed_or_invalid_caller}] ->
+ case {ret1, Pipe.read(secondary, size, caller)} do
+ {:eof, :eof} ->
+ :eof
+
+ {_, {:ok, bin}} ->
+ {:ok, {secondary.name, bin}}
+
+ {ret1, {:error, :pipe_closed_or_invalid_caller}} ->
+ ret1
+
+ {_, ret2} ->
+ ret2
+ end
+
+ {:ok, bin} ->
+ {:ok, {primary.name, bin}}
+
+ ret1 ->
+ ret1
+ end
+ end
+
+ @spec validate_read_any_operation(operation) ::
+ {:ok, read_any_operation()} | {:error, :invalid_operation}
+ defp validate_read_any_operation(operation) do
+ case operation do
+ {:read_stdout_or_stderr, _from, size} when is_integer(size) and size >= 0 ->
+ {:ok, operation}
+
+ _ ->
+ {:error, :invalid_operation}
+ end
+ end
+
+ @spec validate_read_operation(operation) ::
+ {:ok, read_operation()} | {:error, :invalid_operation}
+ defp validate_read_operation(operation) do
+ case operation do
+ {:read_stdout, _from, size} when is_integer(size) and size >= 0 ->
+ {:ok, operation}
+
+ {:read_stderr, _from, size} when is_integer(size) and size >= 0 ->
+ {:ok, operation}
+
+ _ ->
+ {:error, :invalid_operation}
+ end
+ end
+
+ @spec validate_write_operation(operation) ::
+ {:ok, write_operation()} | {:error, :invalid_operation}
+ defp validate_write_operation(operation) do
+ case operation do
+ {:write_stdin, _from, bin} when is_binary(bin) ->
+ {:ok, operation}
+
+ _ ->
+ {:error, :invalid_operation}
+ end
+ end
+
+ @spec validate_operation(operation) :: {:ok, operation()} | {:error, :invalid_operation}
+ defp validate_operation(operation) do
+ with {:error, :invalid_operation} <- validate_read_operation(operation),
+ {:error, :invalid_operation} <- validate_read_any_operation(operation) do
+ validate_write_operation(operation)
+ end
+ end
+
+ @spec pipe_name(operation()) :: :stdin | :stdout | :stderr
+ defp pipe_name({op, _from, _}) do
+ case op do
+ :write_stdin -> :stdin
+ :read_stdout -> :stdout
+ :read_stderr -> :stderr
+ :read_stdout_or_stderr -> :stdout
+ end
+ end
+end
diff --git a/lib/exile/process/pipe.ex b/lib/exile/process/pipe.ex
new file mode 100644
index 0000000..c4d0a5a
--- /dev/null
+++ b/lib/exile/process/pipe.ex
@@ -0,0 +1,91 @@
+defmodule Exile.Process.Pipe do
+ @moduledoc false
+
+ alias Exile.Process.Nif
+
+ @type name :: Exile.Process.pipe_name()
+
+ @type fd :: non_neg_integer()
+
+ @type t :: %__MODULE__{
+ name: name,
+ fd: pos_integer() | nil,
+ monitor_ref: reference() | nil,
+ owner: pid | nil,
+ status: :open | :closed
+ }
+
+ defstruct [:name, :fd, :monitor_ref, :owner, status: :init]
+
+ alias __MODULE__
+
+ @spec new(name, pos_integer, pid) :: t
+ def new(name, fd, owner) do
+ if name in [:stdin, :stdout, :stderr] do
+ ref = Process.monitor(owner)
+ %Pipe{name: name, fd: fd, status: :open, owner: owner, monitor_ref: ref}
+ else
+ raise "invalid pipe name"
+ end
+ end
+
+ @spec new(name) :: t
+ def new(name) do
+ if name in [:stdin, :stdout, :stderr] do
+ %Pipe{name: name, status: :closed}
+ else
+ raise "invalid pipe name"
+ end
+ end
+
+ @spec open?(t) :: boolean()
+ def open?(pipe), do: pipe.status == :open
+
+ @spec read(t, non_neg_integer, pid) :: :eof | {:ok, binary} | {:error, :eagain} | {:error, term}
+ def read(pipe, size, caller) do
+ if caller != pipe.owner do
+ {:error, :pipe_closed_or_invalid_caller}
+ else
+ case Nif.nif_read(pipe.fd, size) do
+ # normalize return value
+ {:ok, <<>>} -> :eof
+ ret -> ret
+ end
+ end
+ end
+
+ @spec write(t, binary, pid) :: {:ok, size :: non_neg_integer()} | {:error, term}
+ def write(pipe, bin, caller) do
+ if caller != pipe.owner do
+ {:error, :pipe_closed_or_invalid_caller}
+ else
+ Nif.nif_write(pipe.fd, bin)
+ end
+ end
+
+ @spec close(t, pid) :: {:ok, t} | {:error, :pipe_closed_or_invalid_caller}
+ def close(pipe, caller) do
+ if caller != pipe.owner do
+ {:error, :pipe_closed_or_invalid_caller}
+ else
+ Process.demonitor(pipe.monitor_ref, [:flush])
+ Nif.nif_close(pipe.fd)
+ pipe = %Pipe{pipe | status: :closed, monitor_ref: nil, owner: nil}
+
+ {:ok, pipe}
+ end
+ end
+
+ @spec set_owner(t, pid) :: {:ok, t} | {:error, :closed}
+ def set_owner(pipe, new_owner) do
+ if pipe.status == :open do
+ ref = Process.monitor(new_owner)
+ Process.demonitor(pipe.monitor_ref, [:flush])
+ pipe = %Pipe{pipe | owner: new_owner, monitor_ref: ref}
+
+ {:ok, pipe}
+ else
+ {:error, :closed}
+ end
+ end
+end
diff --git a/lib/exile/process/state.ex b/lib/exile/process/state.ex
new file mode 100644
index 0000000..8775f8e
--- /dev/null
+++ b/lib/exile/process/state.ex
@@ -0,0 +1,99 @@
+defmodule Exile.Process.State do
+ @moduledoc false
+
+ alias Exile.Process.Exec
+ alias Exile.Process.Operations
+ alias Exile.Process.Pipe
+
+ alias __MODULE__
+
+ @type read_mode :: :stdout | :stderr | :stdout_or_stderr
+
+ @type pipes :: %{
+ stdin: Pipe.t(),
+ stdout: Pipe.t(),
+ stderr: Pipe.t()
+ }
+
+ @type status ::
+ :init
+ | :running
+ | {:exit, non_neg_integer()}
+ | {:exit, {:error, error :: term}}
+
+ @type t :: %__MODULE__{
+ args: Exec.args(),
+ owner: pid,
+ port: port(),
+ pipes: pipes,
+ status: status,
+ enable_stderr: boolean(),
+ operations: Operations.t(),
+ exit_ref: reference(),
+ monitor_ref: reference()
+ }
+
+ defstruct [
+ :args,
+ :owner,
+ :port,
+ :pipes,
+ :status,
+ :enable_stderr,
+ :operations,
+ :exit_ref,
+ :monitor_ref
+ ]
+
+ alias __MODULE__
+
+ @spec pipe(State.t(), name :: Pipe.name()) :: {:ok, Pipe.t()} | {:error, :invalid_name}
+ def pipe(%State{} = state, name) do
+ if name in [:stdin, :stdout, :stderr] do
+ {:ok, Map.fetch!(state.pipes, name)}
+ else
+ {:error, :invalid_name}
+ end
+ end
+
+ @spec put_pipe(State.t(), name :: Pipe.name(), Pipe.t()) :: {:ok, t} | {:error, :invalid_name}
+ def put_pipe(%State{} = state, name, pipe) do
+ if name in [:stdin, :stdout, :stderr] do
+ pipes = Map.put(state.pipes, name, pipe)
+ state = %State{state | pipes: pipes}
+ {:ok, state}
+ else
+ {:error, :invalid_name}
+ end
+ end
+
+ @spec pipe_name_for_fd(State.t(), fd :: Pipe.fd()) :: Pipe.name()
+ def pipe_name_for_fd(state, fd) do
+ pipe =
+ state.pipes
+ |> Map.values()
+ |> Enum.find(&(&1.fd == fd))
+
+ pipe.name
+ end
+
+ @spec put_operation(State.t(), Operations.operation()) :: {:ok, t} | {:error, term}
+ def put_operation(%State{operations: ops} = state, operation) do
+ with {:ok, ops} <- Operations.put(ops, operation) do
+ {:ok, %State{state | operations: ops}}
+ end
+ end
+
+ @spec pop_operation(State.t(), Operations.name()) ::
+ {:ok, Operations.operation(), t} | {:error, term}
+ def pop_operation(%State{operations: ops} = state, name) do
+ with {:ok, operation, ops} <- Operations.pop(ops, name) do
+ {:ok, operation, %State{state | operations: ops}}
+ end
+ end
+
+ @spec set_status(State.t(), status) :: State.t()
+ def set_status(state, status) do
+ %State{state | status: status}
+ end
+end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index 9485572..a4aa52f 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,205 +1,267 @@
defmodule Exile.Stream do
@moduledoc """
Defines a `Exile.Stream` struct returned by `Exile.stream!/2`.
"""
alias Exile.Process
alias Exile.Process.Error
+ require Logger
+
defmodule Sink do
@moduledoc false
- defstruct [:process]
+ @type t :: %__MODULE__{process: Process.t(), ignore_epipe: boolean}
+
+ defstruct [:process, :ignore_epipe]
defimpl Collectable do
- def into(%{process: process} = stream) do
+ def into(%{process: process}) do
collector_fun = fn
:ok, {:cont, x} ->
- :ok = Process.write(process, x)
+ case Process.write(process, x) do
+ {:error, :epipe} ->
+ # there is no other way to stop a Collectable than to
+ # raise error, we catch this error and return `{:error, :epipe}`
+ raise Error, "epipe"
+
+ :ok ->
+ :ok
+ end
- :ok, :done ->
- :ok = Process.close_stdin(process)
- stream
+ acc, :done ->
+ acc
- :ok, :halt ->
- :ok = Process.close_stdin(process)
+ acc, :halt ->
+ acc
end
{:ok, collector_fun}
end
end
end
- defstruct [:process, :stream_opts]
+ defstruct [:process, :stream_opts, :writer_task]
- @type t :: %__MODULE__{}
+ @typedoc "Struct members are private, do not depend on them"
+ @type t :: %__MODULE__{process: Process.t(), stream_opts: map, writer_task: Task.t()}
@doc false
+ @spec __build__(nonempty_list(String.t()), keyword()) :: t()
def __build__(cmd_with_args, opts) do
{stream_opts, process_opts} =
- Keyword.split(opts, [:exit_timeout, :max_chunk_size, :input, :use_stderr])
-
- with {:ok, stream_opts} <- normalize_stream_opts(stream_opts) do
- process_opts = Keyword.put(process_opts, :use_stderr, stream_opts[:use_stderr])
- {:ok, process} = Process.start_link(cmd_with_args, process_opts)
- start_input_streamer(%Sink{process: process}, stream_opts.input)
- %Exile.Stream{process: process, stream_opts: stream_opts}
- else
- {:error, error} -> raise ArgumentError, message: error
+ Keyword.split(opts, [:exit_timeout, :max_chunk_size, :input, :enable_stderr, :ignore_epipe])
+
+ case normalize_stream_opts(stream_opts) do
+ {:ok, stream_opts} ->
+ process_opts = Keyword.put(process_opts, :enable_stderr, stream_opts[:enable_stderr])
+ {:ok, process} = Process.start_link(cmd_with_args, process_opts)
+
+ writer_task =
+ start_input_streamer(
+ %Sink{process: process, ignore_epipe: stream_opts[:ignore_epipe]},
+ stream_opts.input
+ )
+
+ %Exile.Stream{process: process, stream_opts: stream_opts, writer_task: writer_task}
+
+ {:error, error} ->
+ raise ArgumentError, message: error
end
end
@doc false
- defp start_input_streamer(sink, input) do
+ @spec start_input_streamer(term, term) :: Task.t()
+ defp start_input_streamer(%Sink{process: process} = sink, input) do
case input do
:no_input ->
- :ok
+ # use `Task.completed(:ok)` when bumping min Elixir requirement
+ Task.async(fn -> :ok end)
{:enumerable, enum} ->
- spawn_link(fn ->
- Enum.into(enum, sink)
+ Task.async(fn ->
+ Process.change_pipe_owner(process, :stdin, self())
+
+ try do
+ Enum.into(enum, sink)
+ rescue
+ Error ->
+ {:error, :epipe}
+ end
end)
{:collectable, func} ->
- spawn_link(fn ->
- func.(sink)
+ Task.async(fn ->
+ Process.change_pipe_owner(process, :stdin, self())
+
+ try do
+ func.(sink)
+ rescue
+ Error ->
+ {:error, :epipe}
+ end
end)
end
end
defimpl Enumerable do
+ # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def reduce(arg, acc, fun) do
- %{process: process, stream_opts: %{use_stderr: use_stderr} = stream_opts} = arg
-
- start_fun = fn -> :normal end
-
- next_fun = fn :normal ->
+ %{
+ process: process,
+ stream_opts:
+ %{
+ enable_stderr: enable_stderr,
+ ignore_epipe: ignore_epipe
+ } = stream_opts,
+ writer_task: writer_task
+ } = arg
+
+ start_fun = fn -> :premature_exit end
+
+ next_fun = fn :premature_exit ->
case Process.read_any(process, stream_opts.max_chunk_size) do
:eof ->
- {:halt, :normal}
+ {:halt, :normal_exit}
- {:ok, {:stdout, x}} when use_stderr == false ->
- {[IO.iodata_to_binary(x)], :normal}
+ {:ok, {:stdout, x}} when enable_stderr == false ->
+ {[IO.iodata_to_binary(x)], :premature_exit}
- {:ok, {stream, x}} when use_stderr == true ->
- {[{stream, IO.iodata_to_binary(x)}], :normal}
+ {:ok, {stream, x}} when enable_stderr == true ->
+ {[{stream, IO.iodata_to_binary(x)}], :premature_exit}
{:error, errno} ->
- raise Error, "Failed to read from the external process. errno: #{errno}"
+ raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
end
end
after_fun = fn exit_type ->
- try do
- # always close stdin before stopping to give the command chance to exit properly
- Process.close_stdin(process)
- result = Process.await_exit(process, stream_opts.exit_timeout)
+ result = Process.await_exit(process, stream_opts.exit_timeout)
+ writer_task_status = Task.await(writer_task)
- case {exit_type, result} do
- {_, :timeout} ->
- Process.kill(process, :sigkill)
- raise Error, "command fail to exit within timeout: #{stream_opts[:exit_timeout]}"
+ case {exit_type, result, writer_task_status} do
+ # if reader exit early and there is a pending write
+ {:premature_exit, {:ok, _status}, {:error, :epipe}} when ignore_epipe ->
+ :ok
- {:normal, {:ok, {:exit, 0}}} ->
- :ok
+ # if reader exit early and there is no pending write or if
+ # there is no writer
+ {:premature_exit, {:ok, _status}, :ok} when ignore_epipe ->
+ :ok
- {:normal, {:ok, error}} ->
- raise Error, "command exited with status: #{inspect(error)}"
+ # if we get epipe from writer then raise that error, and ignore exit status
+ {:premature_exit, {:ok, _status}, {:error, :epipe}} when ignore_epipe == false ->
+ raise Error, "abnormal command exit, received EPIPE while writing to stdin"
- {exit_type, error} ->
- Process.kill(process, :sigkill)
- raise Error, "command exited with exit_type: #{exit_type}, error: #{inspect(error)}"
- end
- after
- Process.stop(process)
+ # Normal exit success case
+ {_, {:ok, 0}, _} ->
+ :ok
+
+ {:normal_exit, {:ok, error}, _} ->
+ raise Error, "command exited with status: #{inspect(error)}"
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
end
+ @spec normalize_input(term) ::
+ {:ok, :no_input} | {:ok, {:enumerable, term}} | {:ok, {:collectable, function}}
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}
- !is_function(term) && Enumerable.impl_for(term) ->
+ !is_function(term, 1) && Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}
is_function(term, 1) ->
{:ok, {:collectable, term}}
true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end
defp normalize_max_chunk_size(max_chunk_size) do
case max_chunk_size do
nil ->
- {:ok, 65536}
+ {:ok, 65_536}
max_chunk_size when is_integer(max_chunk_size) and max_chunk_size > 0 ->
{:ok, max_chunk_size}
_ ->
{:error, ":max_chunk_size must be a positive integer"}
end
end
defp normalize_exit_timeout(timeout) do
case timeout do
nil ->
- {:ok, :infinity}
+ {:ok, 5000}
timeout when is_integer(timeout) and timeout > 0 ->
{:ok, timeout}
_ ->
{:error, ":exit_timeout must be either :infinity or an integer"}
end
end
- defp normalize_use_stderr(use_stderr) do
- case use_stderr do
+ defp normalize_enable_stderr(enable_stderr) do
+ case enable_stderr do
nil ->
{:ok, false}
- use_stderr when is_boolean(use_stderr) ->
- {:ok, use_stderr}
+ enable_stderr when is_boolean(enable_stderr) ->
+ {:ok, enable_stderr}
_ ->
- {:error, ":use_stderr must be a boolean"}
+ {:error, ":enable_stderr must be a boolean"}
end
end
- defp normalize_stream_opts(opts) when is_list(opts) do
+ defp normalize_ignore_epipe(ignore_epipe) do
+ case ignore_epipe do
+ nil ->
+ {:ok, false}
+
+ ignore_epipe when is_boolean(ignore_epipe) ->
+ {:ok, ignore_epipe}
+
+ _ ->
+ {:error, ":ignore_epipe must be a boolean"}
+ end
+ end
+
+ defp normalize_stream_opts(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]),
{:ok, max_chunk_size} <- normalize_max_chunk_size(opts[:max_chunk_size]),
- {:ok, use_stderr} <- normalize_use_stderr(opts[:use_stderr]) do
+ {:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
+ {:ok, ignore_epipe} <- normalize_ignore_epipe(opts[:ignore_epipe]) do
{:ok,
%{
input: input,
exit_timeout: exit_timeout,
max_chunk_size: max_chunk_size,
- use_stderr: use_stderr
+ enable_stderr: enable_stderr,
+ ignore_epipe: ignore_epipe
}}
end
end
-
- defp normalize_stream_opts(_), do: {:error, "stream_opts must be a keyword list"}
end
diff --git a/lib/exile/watcher.ex b/lib/exile/watcher.ex
index 7e7def9..f659cb1 100644
--- a/lib/exile/watcher.ex
+++ b/lib/exile/watcher.ex
@@ -1,79 +1,82 @@
defmodule Exile.Watcher do
@moduledoc false
use GenServer, restart: :temporary
+
require Logger
- alias Exile.ProcessNif, as: Nif
+ alias Exile.Process.Nif, as: Nif
def start_link(args) do
{:ok, _pid} = GenServer.start_link(__MODULE__, args)
end
def watch(pid, os_pid, socket_path) do
spec = {Exile.Watcher, %{pid: pid, os_pid: os_pid, socket_path: socket_path}}
DynamicSupervisor.start_child(Exile.WatcherSupervisor, spec)
end
+ @impl true
def init(args) do
%{pid: pid, os_pid: os_pid, socket_path: socket_path} = args
Process.flag(:trap_exit, true)
ref = Elixir.Process.monitor(pid)
{:ok, %{pid: pid, os_pid: os_pid, socket_path: socket_path, ref: ref}}
end
+ @impl true
def handle_info({:DOWN, ref, :process, pid, _reason}, %{
pid: pid,
socket_path: socket_path,
os_pid: os_pid,
ref: ref
}) do
- File.rm!(socket_path)
- attempt_graceful_exit(os_pid)
+ _ = File.rm(socket_path)
+ # at max we wait for 50ms for program to exit
+ if process_exit?(os_pid, 50) do
+ Logger.debug("External program exited automatically")
+ else
+ attempt_graceful_exit(os_pid)
+ end
+
{:stop, :normal, nil}
end
def handle_info({:EXIT, _, reason}, nil), do: {:stop, reason, nil}
# when watcher is attempted to be killed, we forcefully kill external os process.
# This can happen when beam receive SIGTERM
def handle_info({:EXIT, _, reason}, %{pid: pid, socket_path: socket_path, os_pid: os_pid}) do
Logger.debug(fn -> "Watcher exiting. reason: #{inspect(reason)}" end)
File.rm!(socket_path)
- Exile.Process.stop(pid)
+ Elixir.Process.exit(pid, :watcher_exit)
attempt_graceful_exit(os_pid)
{:stop, reason, nil}
end
defp attempt_graceful_exit(os_pid) do
- try do
- Logger.debug(fn -> "Stopping external program" end)
-
- # at max we wait for 100ms for program to exit
- process_exit?(os_pid, 100) && throw(:done)
+ Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
+ Nif.nif_kill(os_pid, :sigterm)
+ process_exit?(os_pid, 100) && throw(:done)
- Logger.debug("Failed to stop external program gracefully. attempting SIGTERM")
- Nif.nif_kill(os_pid, :sigterm)
- process_exit?(os_pid, 100) && throw(:done)
+ Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
+ Nif.nif_kill(os_pid, :sigkill)
+ process_exit?(os_pid, 200) && throw(:done)
- Logger.debug("Failed to stop external program with SIGTERM. attempting SIGKILL")
- Nif.nif_kill(os_pid, :sigkill)
- process_exit?(os_pid, 200) && throw(:done)
-
- Logger.error("failed to kill external process")
- raise "Failed to kill external process"
- catch
- :done -> Logger.debug(fn -> "External program exited successfully" end)
- end
+ Logger.error("failed to kill external process")
+ raise "Failed to kill external process"
+ catch
+ :done ->
+ Logger.debug(fn -> "External program exited successfully" end)
end
defp process_exit?(os_pid), do: !Nif.nif_is_os_pid_alive(os_pid)
defp process_exit?(os_pid, timeout) do
if process_exit?(os_pid) do
true
else
:timer.sleep(timeout)
process_exit?(os_pid)
end
end
end
diff --git a/mix.exs b/mix.exs
index d6f5d57..26bc97f 100644
--- a/mix.exs
+++ b/mix.exs
@@ -1,59 +1,64 @@
defmodule Exile.MixProject do
use Mix.Project
@version "0.1.0"
@scm_url "https://github.com/akash-akya/exile"
def project do
[
app: :exile,
version: @version,
elixir: "~> 1.7",
start_permanent: Mix.env() == :prod,
compilers: [:elixir_make] ++ Mix.compilers(),
make_targets: ["all"],
make_clean: ["clean"],
deps: deps(),
# Package
package: package(),
description: description(),
# Docs
source_url: @scm_url,
homepage_url: @scm_url,
docs: [
main: "readme",
source_ref: "v#{@version}",
extras: ["README.md", "LICENSE.md"]
]
]
end
def application do
[
mod: {Exile, []},
extra_applications: [:logger, :crypto]
]
end
defp description do
"NIF based solution to interact with external programs with back-pressure"
end
defp package do
[
maintainers: ["Akash Hiremath"],
licenses: ["Apache-2.0"],
files: ~w(lib .formatter.exs mix.exs README* LICENSE* Makefile c_src/*.{h,c}),
links: %{GitHub: @scm_url}
]
end
defp deps do
[
{:elixir_make, "~> 0.6", runtime: false},
- {:ex_doc, ">= 0.0.0", only: :dev}
+
+ # development & test
+ {:credo, "~> 1.6", only: [:dev, :test], runtime: false},
+ {:ex_doc, ">= 0.0.0", only: :dev},
+ {:excoveralls, "~> 0.15", only: :test},
+ {:dialyxir, "~> 1.0", only: [:dev], runtime: false}
]
end
end
diff --git a/mix.lock b/mix.lock
index e2cd6b9..c709796 100644
--- a/mix.lock
+++ b/mix.lock
@@ -1,9 +1,24 @@
%{
- "earmark_parser": {:hex, :earmark_parser, "1.4.29", "149d50dcb3a93d9f3d6f3ecf18c918fb5a2d3c001b5d3305c926cddfbd33355b", [:mix], [], "hexpm", "4902af1b3eb139016aed210888748db8070b8125c2342ce3dcae4f38dcc63503"},
- "elixir_make": {:hex, :elixir_make, "0.6.3", "bc07d53221216838d79e03a8019d0839786703129599e9619f4ab74c8c096eac", [:mix], [], "hexpm", "f5cbd651c5678bcaabdbb7857658ee106b12509cd976c2c2fca99688e1daf716"},
- "ex_doc": {:hex, :ex_doc, "0.29.0", "4a1cb903ce746aceef9c1f9ae8a6c12b742a5461e6959b9d3b24d813ffbea146", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "f096adb8bbca677d35d278223361c7792d496b3fc0d0224c9d4bc2f651af5db1"},
+ "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
+ "certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
+ "credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"},
+ "dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"},
+ "earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"},
+ "elixir_make": {:hex, :elixir_make, "0.7.6", "67716309dc5d43e16b5abbd00c01b8df6a0c2ab54a8f595468035a50189f9169", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5a0569756b0f7873a77687800c164cca6dfc03a09418e6fcf853d78991f49940"},
+ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
+ "ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
+ "excoveralls": {:hex, :excoveralls, "0.16.1", "0bd42ed05c7d2f4d180331a20113ec537be509da31fed5c8f7047ce59ee5a7c5", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dae763468e2008cf7075a64cb1249c97cb4bc71e236c5c2b5e5cdf1cfa2bf138"},
+ "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
+ "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~> 2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
+ "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
+ "jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
- "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
+ "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
- "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
+ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
+ "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
+ "nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"},
+ "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
+ "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
+ "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
}
diff --git a/test/exile/process_nif_test.exs b/test/exile/process_nif_test.exs
deleted file mode 100644
index 19c2584..0000000
--- a/test/exile/process_nif_test.exs
+++ /dev/null
@@ -1,3 +0,0 @@
-defmodule Exile.ProcessNifTest do
- use ExUnit.Case, async: false
-end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index d079ec0..1fc4ba1 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,410 +1,636 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
+
alias Exile.Process
+ alias Exile.Process.{Pipe, State}
- test "read" do
- {:ok, s} = Process.start_link(~w(echo test))
- assert {:ok, iodata} = Process.read(s, 100)
- assert :eof = Process.read(s, 100)
- assert IO.iodata_to_binary(iodata) == "test\n"
- assert :ok == Process.close_stdin(s)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
- Process.stop(s)
- end
+ doctest Exile.Process
- test "write" do
- {:ok, s} = Process.start_link(~w(cat))
- assert :ok == Process.write(s, "hello")
- assert {:ok, iodata} = Process.read(s, 5)
- assert IO.iodata_to_binary(iodata) == "hello"
-
- assert :ok == Process.write(s, "world")
- assert {:ok, iodata} = Process.read(s, 5)
- assert IO.iodata_to_binary(iodata) == "world"
-
- assert :ok == Process.close_stdin(s)
- assert :eof == Process.read(s)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
- Process.stop(s)
- end
+ describe "pipes" do
+ test "reading from stdout" do
+ {:ok, s} = Process.start_link(~w(echo test))
+ :timer.sleep(100)
- test "stdin close" do
- logger = start_events_collector()
+ assert {:ok, iodata} = Process.read(s, 100)
+ assert :eof = Process.read(s, 100)
+ assert IO.iodata_to_binary(iodata) == "test\n"
- # base64 produces output only after getting EOF from stdin. we
- # collect events in order and assert that we can still read from
- # stdout even after closing stdin
- {:ok, s} = Process.start_link(~w(base64))
+ assert :ok == Process.close_stdin(s)
+ assert :ok == Process.close_stdout(s)
- # parallel reader should be blocked till we close stdin
- start_parallel_reader(s, logger)
- :timer.sleep(100)
+ assert {:ok, 0} == Process.await_exit(s, 500)
- assert :ok == Process.write(s, "hello")
- add_event(logger, {:write, "hello"})
- assert :ok == Process.write(s, "world")
- add_event(logger, {:write, "world"})
- :timer.sleep(100)
+ refute Elixir.Process.alive?(s.pid)
+ end
- assert :ok == Process.close_stdin(s)
- add_event(logger, :input_close)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
- Process.stop(s)
+ test "write to stdin" do
+ {:ok, s} = Process.start_link(~w(cat))
- assert [
- {:write, "hello"},
- {:write, "world"},
- :input_close,
- {:read, "aGVsbG93b3JsZA==\n"},
- :eof
- ] == get_events(logger)
- end
+ assert :ok == Process.write(s, "hello")
+ assert {:ok, iodata} = Process.read(s, 5)
+ assert IO.iodata_to_binary(iodata) == "hello"
- test "external command termination on stop" do
- {:ok, s} = Process.start_link(~w(cat))
- {:ok, os_pid} = Process.os_pid(s)
- assert os_process_alive?(os_pid)
+ assert :ok == Process.write(s, "world")
+ assert {:ok, iodata} = Process.read(s, 5)
+ assert IO.iodata_to_binary(iodata) == "world"
- Process.stop(s)
- :timer.sleep(100)
+ assert :ok == Process.close_stdin(s)
+ assert :eof == Process.read(s)
- refute os_process_alive?(os_pid)
- end
+ assert {:ok, 0} == Process.await_exit(s, 100)
- test "external command kill on stop" do
- {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ :timer.sleep(100)
+ refute Elixir.Process.alive?(s.pid)
+ end
- {:ok, os_pid} = Process.os_pid(s)
- assert os_process_alive?(os_pid)
- Process.stop(s)
+ test "when stdin is closed" do
+ logger = start_events_collector()
+
+ # base64 produces output only after getting EOF from stdin. we
+ # collect events in order and assert that we can still read from
+ # stdout even after closing stdin
+ {:ok, s} = Process.start_link(~w(base64))
+
+ # parallel reader should be blocked till we close stdin
+ start_parallel_reader(s, logger)
+ :timer.sleep(100)
+
+ assert :ok == Process.write(s, "hello")
+ add_event(logger, {:write, "hello"})
+ assert :ok == Process.write(s, "world")
+ add_event(logger, {:write, "world"})
+ :timer.sleep(100)
+
+ assert :ok == Process.close_stdin(s)
+ add_event(logger, :input_close)
+ assert {:ok, 0} == Process.await_exit(s)
+ # Process.stop(s)
+
+ # wait for the reader to read
+ Elixir.Process.sleep(500)
+
+ assert [
+ {:write, "hello"},
+ {:write, "world"},
+ :input_close,
+ {:read, "aGVsbG93b3JsZA==\n"},
+ :eof
+ ] == get_events(logger)
+ end
+
+ test "reading from stderr" do
+ {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: true)
+ assert {:ok, "foo\n"} = Process.read_stderr(s, 100)
+ end
+
+ test "reading from stdout or stderr using read_any" do
+ script = """
+ echo "foo"
+ echo "bar" >&2
+ """
+
+ {:ok, s} = Process.start_link(["sh", "-c", script], enable_stderr: true)
+
+ {:ok, ret1} = Process.read_any(s, 100)
+ {:ok, ret2} = Process.read_any(s, 100)
+
+ assert {:stderr, "bar\n"} in [ret1, ret2]
+ assert {:stdout, "foo\n"} in [ret1, ret2]
+
+ assert :eof = Process.read_any(s, 100)
+ end
- if os_process_alive?(os_pid) do
+ test "reading from stderr_read when stderr disabled" do
+ {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: false)
+ assert {:error, :pipe_closed_or_invalid_caller} = Process.read_stderr(s, 100)
+ end
+
+ test "read_any with stderr disabled" do
+ script = """
+ echo "foo"
+ echo "bar" >&2
+ """
+
+ {:ok, s} = Process.start_link(["sh", "-c", script], enable_stderr: false)
+ {:ok, ret} = Process.read_any(s, 100)
+
+ # we can still read from stdout even if stderr is disabled
+ assert ret == {:stdout, "foo\n"}
+ assert :eof = Process.read_any(s, 100)
+ end
+
+ test "if pipe gets closed on pipe owner exit normally" do
+ {:ok, s} = Process.start_link(~w(sleep 10000))
+
+ writer =
+ Task.async(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+ end)
+
+ # stdin must be closed on task completion
+ :ok = Task.await(writer)
+
+ assert %State{
+ pipes: %{
+ stdin: %Pipe{
+ name: :stdin,
+ fd: _,
+ monitor_ref: nil,
+ owner: nil,
+ status: :closed
+ },
+ # ensure other pipes are unaffected
+ stdout: %Pipe{
+ name: :stdout,
+ status: :open
+ }
+ }
+ } = :sys.get_state(s.pid)
+ end
+
+ test "if pipe gets closed on pipe owner is killed" do
+ {:ok, s} = Process.start_link(~w(sleep 10000))
+
+ writer =
+ spawn(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+
+ receive do
+ :block -> :ok
+ end
+ end)
+
+ # wait for pipe owner to change
+ :timer.sleep(100)
+
+ # stdin must be closed on process kill
+ true = Elixir.Process.exit(writer, :kill)
:timer.sleep(1000)
- refute os_process_alive?(os_pid)
- else
- :ok
+
+ assert %State{
+ pipes: %{
+ stdin: %Pipe{
+ name: :stdin,
+ fd: _,
+ monitor_ref: nil,
+ owner: nil,
+ status: :closed
+ },
+ # ensure other pipes are unaffected
+ stdout: %Pipe{
+ name: :stdout,
+ status: :open
+ }
+ }
+ } = :sys.get_state(s.pid)
end
end
- test "exit status" do
- {:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
- assert {:ok, {:exit, 10}} == Process.await_exit(s, 500)
- Process.stop(s)
- end
+ describe "process termination" do
+ test "if external program terminates on process exit" do
+ {:ok, s} = Process.start_link(~w(cat))
+ {:ok, os_pid} = Process.os_pid(s)
- test "writing binary larger than pipe buffer size" do
- large_bin = generate_binary(5 * 65535)
- {:ok, s} = Process.start_link(~w(cat))
+ assert os_process_alive?(os_pid)
- writer =
- Task.async(fn ->
- Process.write(s, large_bin)
- Process.close_stdin(s)
- end)
+ :ok = Process.close_stdin(s)
+ :timer.sleep(100)
+
+ refute os_process_alive?(os_pid)
+ end
+
+ test "watcher kills external command on process without exit_await" do
+ {os_pid, s} =
+ Task.async(fn ->
+ {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ {:ok, os_pid} = Process.os_pid(s)
+ assert os_process_alive?(os_pid)
+
+ # ensure the script set the correct signal handlers (handlers to ignore signal)
+ assert {:ok, "ignored signals\n"} = Process.read(s)
+
+ # exit without waiting for the exile process
+ {os_pid, s}
+ end)
+ |> Task.await()
+
+ :timer.sleep(500)
+
+ # Exile Process should exit after Task process terminates
+ refute Elixir.Process.alive?(s.pid)
+ refute os_process_alive?(os_pid)
+ end
+
+ test "await_exit with timeout" do
+ {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ {:ok, os_pid} = Process.os_pid(s)
+ assert os_process_alive?(os_pid)
+
+ assert {:ok, "ignored signals\n"} = Process.read(s)
+
+ # attempt to kill the process after 100ms
+ assert {:ok, 137} = Process.await_exit(s, 100)
+
+ refute os_process_alive?(os_pid)
+ refute Elixir.Process.alive?(s.pid)
+ end
+
+ test "exit status" do
+ {:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
+ assert {:ok, 10} == Process.await_exit(s)
+ end
+
+ test "writing binary larger than pipe buffer size" do
+ large_bin = generate_binary(5 * 65_535)
+ {:ok, s} = Process.start_link(~w(cat))
+
+ writer =
+ Task.async(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+ Process.write(s, large_bin)
+ end)
- :timer.sleep(100)
+ :timer.sleep(100)
- iodata =
- Stream.unfold(nil, fn _ ->
- case Process.read(s) do
- {:ok, data} -> {data, nil}
- :eof -> nil
- end
+ iodata =
+ Stream.unfold(nil, fn _ ->
+ case Process.read(s) do
+ {:ok, data} -> {data, nil}
+ :eof -> nil
+ end
+ end)
+ |> Enum.to_list()
+
+ Task.await(writer)
+
+ assert IO.iodata_length(iodata) == 5 * 65_535
+ assert {:ok, 0} == Process.await_exit(s, 500)
+ end
+
+ test "if exile process is terminated on owner exit even if pipe owner is alive" do
+ parent = self()
+
+ owner =
+ spawn(fn ->
+ # owner process terminated without await_exit
+ {:ok, s} = Process.start_link(~w(cat))
+
+ snd(parent, {:ok, s})
+ :exit = recv(parent)
+ end)
+
+ {:ok, s} = recv(owner)
+
+ spawn_link(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+ block()
end)
- |> Enum.to_list()
- Task.await(writer)
+ spawn_link(fn ->
+ Process.change_pipe_owner(s, :stdout, self())
+ block()
+ end)
- assert IO.iodata_length(iodata) == 5 * 65535
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
- Process.stop(s)
- end
+ # wait for pipe owner to change
+ :timer.sleep(500)
- test "stderr_read" do
- {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], use_stderr: true)
- assert {:ok, "foo\n"} = Process.read_stderr(s, 100)
- Process.stop(s)
- end
+ snd(owner, :exit)
- test "stderr_read with stderr disabled" do
- {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], use_stderr: false)
- assert {:error, :cannot_read_stderr} = Process.read_stderr(s, 100)
- Process.stop(s)
- end
+ # wait for messages to propagate, if there are any
+ :timer.sleep(500)
- test "read_any" do
- script = """
- echo "foo"
- echo "bar" >&2
- """
+ refute Elixir.Process.alive?(owner)
+ refute Elixir.Process.alive?(s.pid)
+ end
- {:ok, s} = Process.start_link(["sh", "-c", script], use_stderr: true)
- {:ok, ret1} = Process.read_any(s, 100)
- {:ok, ret2} = Process.read_any(s, 100)
+ test "if exile process is *NOT* terminated on owner exit, if any pipe owner is alive" do
+ parent = self()
- assert {:stderr, "bar\n"} in [ret1, ret2]
- assert {:stdout, "foo\n"} in [ret1, ret2]
+ {:ok, s} = Process.start_link(~w(cat))
- assert :eof = Process.read_any(s, 100)
- Process.stop(s)
- end
+ io_proc =
+ spawn_link(fn ->
+ :ok = Process.change_pipe_owner(s, :stdin, self())
+ :ok = Process.change_pipe_owner(s, :stdout, self())
+ recv(parent)
+ end)
+
+ # wait for pipe owner to change
+ :timer.sleep(100)
+
+ # external process will be killed with SIGTERM (143)
+ assert {:ok, 143} = Process.await_exit(s, 100)
+
+ # wait for messages to propagate, if there are any
+ :timer.sleep(100)
+
+ assert Elixir.Process.alive?(s.pid)
+
+ assert %State{
+ pipes: %{
+ stdin: %Pipe{status: :open},
+ stdout: %Pipe{status: :open}
+ }
+ } = :sys.get_state(s.pid)
+
+ # when the io_proc exits, the pipes should be closed and process must terminate
+ snd(io_proc, :exit)
+ :timer.sleep(100)
+
+ refute Elixir.Process.alive?(s.pid)
+ end
+
+ test "when process is killed with a pending concurrent write" do
+ {:ok, s} = Process.start_link(~w(cat))
+ {:ok, os_pid} = Process.os_pid(s)
+
+ large_data =
+ Stream.cycle(["test"])
+ |> Stream.take(500_000)
+ |> Enum.to_list()
+ |> IO.iodata_to_binary()
+
+ task =
+ Task.async(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
+ Process.write(s, large_data)
+ end)
+
+ # to avoid race conditions, like if process is killed before owner
+ # is changed
+ :timer.sleep(200)
+
+ assert {:ok, 1} == Process.await_exit(s)
+
+ refute os_process_alive?(os_pid)
+ assert {:error, :epipe} == Task.await(task)
+ end
+
+ test "if owner is killed when the exile process is killed" do
+ parent = self()
+
+ # create an exile process without linking to caller
+ owner =
+ spawn(fn ->
+ assert {:ok, s} = Process.start_link(~w(cat))
+ snd(parent, s.pid)
+ block()
+ end)
+
+ owner_ref = Elixir.Process.monitor(owner)
+
+ exile_pid = recv(owner)
- test "read_any with stderr disabled" do
- script = """
- echo "foo"
- echo "bar" >&2
- """
+ exile_ref = Elixir.Process.monitor(exile_pid)
- {:ok, s} = Process.start_link(["sh", "-c", script], use_stderr: false)
- {:ok, ret1} = Process.read_any(s, 100)
+ assert Elixir.Process.alive?(owner)
+ assert Elixir.Process.alive?(exile_pid)
- assert ret1 == {:stdout, "foo\n"}
+ true = Elixir.Process.exit(exile_pid, :kill)
+
+ assert_receive {:DOWN, ^owner_ref, :process, ^owner, :killed}
+ assert_receive {:DOWN, ^exile_ref, :process, ^exile_pid, :killed}
+ end
+
+ test "if exile process is killed when the owner is killed" do
+ parent = self()
+
+ # create an exile process without linking to caller
+ owner =
+ spawn(fn ->
+ assert {:ok, s} = Process.start_link(~w(cat))
+ snd(parent, s.pid)
+ block()
+ end)
- assert :eof = Process.read_any(s, 100)
- Process.stop(s)
+ owner_ref = Elixir.Process.monitor(owner)
+
+ exile_pid = recv(owner)
+
+ exile_ref = Elixir.Process.monitor(exile_pid)
+
+ assert Elixir.Process.alive?(owner)
+ assert Elixir.Process.alive?(exile_pid)
+
+ true = Elixir.Process.exit(owner, :kill)
+
+ assert_receive {:DOWN, ^owner_ref, :process, ^owner, :killed}
+ assert_receive {:DOWN, ^exile_ref, :process, ^exile_pid, :killed}
+ end
end
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link(~w(cat))
- large_bin = generate_binary(65535 * 5)
+ large_bin = generate_binary(65_535 * 5)
writer =
Task.async(fn ->
+ Process.change_pipe_owner(s, :stdin, self())
:ok = Process.write(s, large_bin)
add_event(logger, {:write, IO.iodata_length(large_bin)})
- Process.close_stdin(s)
end)
:timer.sleep(50)
reader =
Task.async(fn ->
+ Process.change_pipe_owner(s, :stdout, self())
+
Stream.unfold(nil, fn _ ->
case Process.read(s) do
{:ok, data} ->
add_event(logger, {:read, IO.iodata_length(data)})
# delay in reading should delay writes
:timer.sleep(10)
{nil, nil}
:eof ->
nil
end
end)
|> Stream.run()
end)
Task.await(writer)
Task.await(reader)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
- Process.stop(s)
+ assert {:ok, 0} == Process.await_exit(s)
events = get_events(logger)
{write_events, read_evants} = Enum.split_with(events, &match?({:write, _}, &1))
assert Enum.sum(Enum.map(read_evants, fn {:read, size} -> size end)) ==
Enum.sum(Enum.map(write_events, fn {:write, size} -> size end))
# There must be a read before write completes
- assert hd(events) == {:read, 65535}
+ assert hd(events) == {:read, 65_535}
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
- Process.stop(s)
-
open_files = parse_lsof(bin)
- assert [%{fd: "0", name: _, type: "PIPE"}, %{type: "PIPE", fd: "1", name: _}] = open_files
- end
-
- test "process kill with pending write" do
- {:ok, s} = Process.start_link(~w(cat))
- {:ok, os_pid} = Process.os_pid(s)
-
- large_data =
- Stream.cycle(["test"]) |> Stream.take(500_000) |> Enum.to_list() |> IO.iodata_to_binary()
- task =
- Task.async(fn ->
- try do
- Process.write(s, large_data)
- catch
- :exit, reason -> reason
- end
- end)
-
- :timer.sleep(200)
- Process.stop(s)
- :timer.sleep(3000)
-
- refute os_process_alive?(os_pid)
- assert {:normal, _} = Task.await(task)
+ assert [
+ %{type: "PIPE", fd: "0", name: _},
+ %{type: "PIPE", fd: "1", name: _},
+ %{type: "CHR", fd: "2", name: "/dev/ttys007"}
+ ] = open_files
end
- test "concurrent read" do
- {:ok, s} = Process.start_link(~w(cat))
-
- task = Task.async(fn -> Process.read(s, 1) end)
+ describe "options and validation" do
+ test "cd option" do
+ parent = Path.expand("..", File.cwd!())
+ {:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
+ {:ok, dir} = Process.read(s)
- # delaying concurrent read to avoid race-condition
- Elixir.Process.sleep(100)
- assert {:error, :pending_stdout_read} = Process.read(s, 1)
-
- assert :ok == Process.close_stdin(s)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 100)
- Process.stop(s)
- _ = Task.await(task)
- end
-
- test "cd" do
- parent = Path.expand("..", File.cwd!())
- {:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
- {:ok, dir} = Process.read(s)
- assert String.trim(dir) == parent
- assert {:ok, {:exit, 0}} = Process.await_exit(s)
- Process.stop(s)
- end
+ assert String.trim(dir) == parent
+ assert {:ok, 0} = Process.await_exit(s)
+ end
- test "invalid path" do
- assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
- end
+ test "when cd is invalid" do
+ assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
+ end
- test "invalid opt" do
- assert {:error, "invalid opts: [invalid: :test]"} =
- Process.start_link(~w(cat), invalid: :test)
- end
+ test "when user pass invalid option" do
+ assert {:error, "invalid opts: [invalid: :test]"} =
+ Process.start_link(~w(cat), invalid: :test)
+ end
- test "env" do
- assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
+ test "env option" do
+ assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
- assert {:ok, "test\n"} = Process.read(s)
- assert {:ok, {:exit, 0}} = Process.await_exit(s)
- Process.stop(s)
- end
+ assert {:ok, "test\n"} = Process.read(s)
+ assert {:ok, 0} = Process.await_exit(s)
+ end
- test "if external process inherits beam env" do
- :ok = System.put_env([{"BEAM_ENV_A", "10"}])
- assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
+ test "if external process inherits beam env" do
+ :ok = System.put_env([{"BEAM_ENV_A", "10"}])
+ assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
- assert {:ok, "10\n"} = Process.read(s)
- assert {:ok, {:exit, 0}} = Process.await_exit(s)
- Process.stop(s)
- end
+ assert {:ok, "10\n"} = Process.read(s)
+ assert {:ok, 0} = Process.await_exit(s)
+ end
- test "if user env overrides beam env" do
- :ok = System.put_env([{"BEAM_ENV", "base"}])
+ test "if user env overrides beam env" do
+ :ok = System.put_env([{"BEAM_ENV", "base"}])
- assert {:ok, s} =
- Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
+ assert {:ok, s} =
+ Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
- assert {:ok, "overridden\n"} = Process.read(s)
- assert {:ok, {:exit, 0}} = Process.await_exit(s)
- Process.stop(s)
+ assert {:ok, "overridden\n"} = Process.read(s)
+ assert {:ok, 0} = Process.await_exit(s)
+ end
end
- test "await_exit when process is stopped" do
- assert {:ok, s} = Process.start_link(~w(cat))
-
- tasks =
- Enum.map(1..10, fn _ ->
- Task.async(fn -> Process.await_exit(s) end)
- end)
-
- assert :ok == Process.close_stdin(s)
-
- Elixir.Process.sleep(100)
-
- Enum.each(tasks, fn task ->
- assert {:ok, {:exit, 0}} = Task.await(task)
+ def start_parallel_reader(process, logger) do
+ spawn_link(fn ->
+ :ok = Process.change_pipe_owner(process, :stdout, self())
+ reader_loop(process, logger)
end)
-
- Process.stop(s)
end
- def start_parallel_reader(proc_server, logger) do
- spawn_link(fn -> reader_loop(proc_server, logger) end)
- end
-
- def reader_loop(proc_server, logger) do
- case Process.read(proc_server) do
+ def reader_loop(process, logger) do
+ case Process.read(process) do
{:ok, data} ->
add_event(logger, {:read, data})
- reader_loop(proc_server, logger)
+ reader_loop(process, logger)
:eof ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
- Stream.repeatedly(fn -> "A" end) |> Enum.take(size) |> IO.iodata_to_binary()
+ Stream.repeatedly(fn -> "A" end)
+ |> Enum.take(size)
+ |> IO.iodata_to_binary()
+ end
+
+ defp block do
+ rand = :rand.uniform()
+
+ receive do
+ ^rand -> :ok
+ end
+ end
+
+ defp snd(pid, term) do
+ send(pid, {self(), term})
+ end
+
+ defp recv(sender) do
+ receive do
+ {^sender, term} -> term
+ after
+ 1000 ->
+ raise "recv timeout"
+ end
end
end
diff --git a/test/exile/sync_process_test.exs b/test/exile/sync_process_test.exs
index 51a74e8..164a69c 100644
--- a/test/exile/sync_process_test.exs
+++ b/test/exile/sync_process_test.exs
@@ -1,54 +1,54 @@
defmodule Exile.SyncProcessTest do
use ExUnit.Case, async: false
alias Exile.Process
- @bin Stream.repeatedly(fn -> "A" end) |> Enum.take(65535) |> IO.iodata_to_binary()
+ @bin Stream.repeatedly(fn -> "A" end) |> Enum.take(65_535) |> IO.iodata_to_binary()
test "memory leak" do
:timer.sleep(1000)
before_exec = :erlang.memory(:total)
{:ok, s} = Process.start_link(~w(cat))
Enum.each(1..500, fn _ ->
:ok = Process.write(s, @bin)
- {:ok, _} = Process.read(s, 65535)
+ {:ok, _} = Process.read(s, 65_535)
end)
:timer.sleep(1000)
after_exec = :erlang.memory(:total)
assert_in_delta before_exec, after_exec, 1024 * 1024
assert :ok == Process.close_stdin(s)
- assert {:ok, {:exit, 0}} == Process.await_exit(s, 500)
- Process.stop(s)
+ assert {:ok, 0} == Process.await_exit(s, 500)
+ # Process.stop(s)
end
test "if watcher process exits on command exit" do
stop_all_children(Exile.WatcherSupervisor)
assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
assert {:ok, s} = Process.start_link(~w(cat))
# we spawn in background
:timer.sleep(200)
assert %{active: 1, workers: 1} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
Process.close_stdin(s)
- assert {:ok, {:exit, 0}} = Process.await_exit(s, 500)
- Process.stop(s)
+ assert {:ok, 0} = Process.await_exit(s, 500)
+ # Process.stop(s)
# wait for watcher to terminate
:timer.sleep(200)
assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
end
defp stop_all_children(sup) do
DynamicSupervisor.which_children(sup)
|> Enum.each(fn {_, pid, _, _} ->
DynamicSupervisor.terminate_child(sup, pid)
end)
end
end
diff --git a/test/exile/watcher_test.exs b/test/exile/watcher_test.exs
index fc55ba0..82c8cb5 100644
--- a/test/exile/watcher_test.exs
+++ b/test/exile/watcher_test.exs
@@ -1,55 +1,47 @@
defmodule Exile.WatcherTest do
use ExUnit.Case, async: true
alias Exile.Process
- test "uds path socket cleanup after successful exit" do
+ test "when exile process exit normally" do
{:ok, s} = Process.start_link(~w(cat))
- %{socket_path: socket_path} = :sys.get_state(s)
-
- assert File.exists?(socket_path)
-
- :ok = Process.close_stdin(s)
- Elixir.Process.sleep(100)
-
- {:ok, {:exit, 0}} = Process.await_exit(s)
- :ok = Process.stop(s)
-
- Elixir.Process.sleep(100)
-
- refute File.exists?(socket_path)
- end
-
- test "external process and uds path cleanup on error" do
- {:ok, s} = Process.start_link(~w(cat))
- %{socket_path: socket_path} = :sys.get_state(s)
{:ok, os_pid} = Process.os_pid(s)
- assert File.exists?(socket_path)
assert os_process_alive?(os_pid)
- Elixir.Process.exit(s, :kill)
- Elixir.Process.sleep(100)
+ {:ok, 0} = Process.await_exit(s)
- refute File.exists?(socket_path)
refute os_process_alive?(os_pid)
end
- test "if external process is killed with SIGTERM" do
- {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ test "if external process is cleaned up when Exile Process is killed" do
+ parent = self()
- {:ok, os_pid} = Process.os_pid(s)
- assert os_process_alive?(os_pid)
- Process.stop(s)
+ # Exile process is linked to caller so we must run test this in
+ # separate process which is not linked
+ spawn(fn ->
+ {:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
+ {:ok, os_pid} = Process.os_pid(s)
+ assert os_process_alive?(os_pid)
+ send(parent, os_pid)
+
+ Elixir.Process.exit(s.pid, :kill)
+ end)
+
+ os_pid =
+ receive do
+ os_pid -> os_pid
+ end
:timer.sleep(1000)
+
refute os_process_alive?(os_pid)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
end
diff --git a/test/exile_test.exs b/test/exile_test.exs
index 8f03e51..6c7a207 100644
--- a/test/exile_test.exs
+++ b/test/exile_test.exs
@@ -1,62 +1,92 @@
defmodule ExileTest do
use ExUnit.Case
+ doctest Exile
+
test "stream with enumerable" do
proc_stream =
- Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end), use_stderr: false)
+ Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end), enable_stderr: false)
- stdout = Enum.to_list(proc_stream)
+ stdout = proc_stream |> Enum.to_list()
assert IO.iodata_length(stdout) == 1000
end
test "stream with collectable" do
proc_stream =
Exile.stream!(["cat"], input: fn sink -> Enum.into(1..1000, sink, fn _ -> "a" end) end)
stdout = Enum.to_list(proc_stream)
assert IO.iodata_length(stdout) == 1000
end
test "stream without stdin" do
proc_stream = Exile.stream!(~w(echo hello))
stdout = Enum.to_list(proc_stream)
assert IO.iodata_to_binary(stdout) == "hello\n"
end
test "stderr" do
- proc_stream = Exile.stream!(["sh", "-c", "echo foo >>/dev/stderr"], use_stderr: true)
+ proc_stream = Exile.stream!(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: true)
assert {[], stderr} = split_stream(proc_stream)
assert IO.iodata_to_binary(stderr) == "foo\n"
end
test "multiple streams" do
script = """
for i in {1..1000}; do
echo "foo ${i}"
echo "bar ${i}" >&2
done
"""
- proc_stream = Exile.stream!(["sh", "-c", script], use_stderr: true)
+ proc_stream = Exile.stream!(["sh", "-c", script], enable_stderr: true)
{stdout, stderr} = split_stream(proc_stream)
stdout_lines = String.split(Enum.join(stdout), "\n", trim: true)
stderr_lines = String.split(Enum.join(stderr), "\n", trim: true)
assert length(stdout_lines) == length(stderr_lines)
assert Enum.all?(stdout_lines, &String.starts_with?(&1, "foo "))
assert Enum.all?(stderr_lines, &String.starts_with?(&1, "bar "))
end
+ test "environment variable" do
+ output =
+ Exile.stream!(~w(printenv FOO), env: %{"FOO" => "bar"})
+ |> Enum.to_list()
+ |> IO.iodata_to_binary()
+
+ assert output == "bar\n"
+ end
+
+ test "premature stream termination" do
+ input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
+
+ assert_raise Exile.Process.Error,
+ "abnormal command exit, received EPIPE while writing to stdin",
+ fn ->
+ Exile.stream!(~w(cat), input: input_stream)
+ |> Enum.take(1)
+ end
+ end
+
+ test "premature stream termination when ignore_epipe is true" do
+ input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
+
+ assert ["hello"] ==
+ Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true, max_chunk_size: 5)
+ |> Enum.take(1)
+ end
+
defp split_stream(stream) do
{stdout, stderr} =
Enum.reduce(stream, {[], []}, fn
{:stdout, data}, {stdout, stderr} -> {[data | stdout], stderr}
{:stderr, data}, {stdout, stderr} -> {stdout, [data | stderr]}
end)
{Enum.reverse(stdout), Enum.reverse(stderr)}
end
end
diff --git a/test/scripts/ignore_sigterm.sh b/test/scripts/ignore_sigterm.sh
index 67d05d3..e5fef5e 100755
--- a/test/scripts/ignore_sigterm.sh
+++ b/test/scripts/ignore_sigterm.sh
@@ -1,7 +1,10 @@
#!/bin/bash
trap -- '' SIGINT SIGTERM SIGTSTP
+
+echo "ignored signals"
+
while true; do
date +%F_%T
sleep 1
done

File Metadata

Mime Type
text/x-diff
Expires
Wed, Nov 27, 6:29 AM (1 d, 17 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40566
Default Alt Text
(157 KB)

Event Timeline