Page MenuHomePhorge

No OneTemporary

Size
96 KB
Referenced Files
None
Subscribers
None
diff --git a/README.md b/README.md
index 50bdf1b..9fc135b 100644
--- a/README.md
+++ b/README.md
@@ -1,236 +1,244 @@
# Exile
[![CI](https://github.com/akash-akya/exile/actions/workflows/ci.yaml/badge.svg)](https://github.com/akash-akya/exile/actions/workflows/ci.yaml)
[![Hex.pm](https://img.shields.io/hexpm/v/exile.svg)](https://hex.pm/packages/exile)
[![docs](https://img.shields.io/badge/docs-hexpm-blue.svg)](https://hexdocs.pm/exile/)
Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix ports issues.
Exile is built around the idea of having demand-driven, asynchronous
interaction with external process. Think of streaming a video through
`ffmpeg` to serve a web request. Exile internally uses NIF. See
[Rationale](#rationale) for details. It also provides stream
abstraction for interacting with an external program. For example,
getting audio out of a stream is as simple as
``` elixir
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
See `Exile.stream!/2` module doc for more details about handling
stderr and other options.
`Exile.stream!/2` is a convenience wrapper around
`Exile.Process`. Prefer using `Exile.stream!` over using
`Exile.Process` directly.
Exile requires OTP v22.1 and above.
Exile is based on NIF, please know consequence of that before using
Exile. For basic use cases use
[ExCmd](https://github.com/akash-akya/ex_cmd) instead.
## Installation
```elixir
def deps do
[
{:exile, "~> x.x.x"}
]
end
```
## Quick Start
Run a command and read from stdout
```elixir
iex> Exile.stream!(~w(echo Hello))
...> |> Enum.into("") # collect as string
"Hello\n"
```
Run a command with list of strings as input
```elixir
iex> Exile.stream!(~w(cat), input: ["Hello", " ", "World"])
...> |> Enum.into("") # collect as string
"Hello World"
```
Run a command with input as Stream
```elixir
iex> input_stream = Stream.map(1..10, fn num -> "#{num} " end)
iex> Exile.stream!(~w(cat), input: input_stream)
...> |> Enum.into("")
"1 2 3 4 5 6 7 8 9 10 "
```
Run a command with input as infinite stream
```elixir
# create infinite stream
iex> input_stream = Stream.repeatedly(fn -> "A" end)
iex> binary =
...> Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true) # we need to ignore epipe since we are terminating the program before the input completes
...> |> Stream.take(2) # we must limit since the input stream is infinite
...> |> Enum.into("")
iex> is_binary(binary)
true
iex> "AAAAA" <> _ = binary
```
Run a command with input Collectable
```elixir
# Exile calls the callback with a sink where the process can push the data
iex> Exile.stream!(~w(cat), input: fn sink ->
...> Stream.map(1..10, fn num -> "#{num} " end)
...> |> Stream.into(sink) # push to the external process
...> |> Stream.run()
...> end)
...> |> Stream.take(100) # we must limit since the input stream is infinite
...> |> Enum.into("")
"1 2 3 4 5 6 7 8 9 10 "
```
When the command wait for the input stream to close
```elixir
# base64 command wait for the input to close and writes data to stdout at once
iex> Exile.stream!(~w(base64), input: ["abcdef"])
...> |> Enum.into("")
"YWJjZGVm\n"
```
`stream!/2` raises non-zero exit as error
```
iex> Exile.stream!(["sh", "-c", "echo 'foo' && exit 10"])
...> |> Enum.to_list()
** (Exile.Stream.AbnormalExit) program exited with exit status: 10
```
`stream/2` variant returns exit status as last element
```
iex> Exile.stream(["sh", "-c", "echo 'foo' && exit 10"])
...> |> Enum.to_list()
[
"foo\n",
{:exit, {:status, 10}} # returns exit status of the program as last element
]
```
You can fetch exit_status from the error for `stream!/2`
```
iex> try do
...> Exile.stream!(["sh", "-c", "exit 10"])
...> |> Enum.to_list()
...> rescue
...> e in Exile.Stream.AbnormalExit ->
...> e.exit_status
...> end
10
```
With `max_chunk_size` set
```elixir
iex> data =
...> Exile.stream!(~w(cat /dev/urandom), max_chunk_size: 100, ignore_epipe: true)
...> |> Stream.take(5)
...> |> Enum.into("")
iex> byte_size(data)
500
```
When input and output run at different rate
```elixir
iex> input_stream = Stream.map(1..1000, fn num -> "X #{num} X\n" end)
iex> Exile.stream!(~w(grep 250), input: input_stream)
...> |> Enum.into("")
"X 250 X\n"
```
- With stderr enabled
+ With stderr set to `:consume`
```elixir
- iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], enable_stderr: true)
+ iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], stderr: :consume)
...> |> Enum.to_list()
[{:stdout, "foo\n"}, {:stderr, "bar\n"}]
```
+ With stderr set to `:disable`
+
+ ```elixir
+ iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], stderr: :disable)
+ ...> |> Enum.to_list()
+ ["foo\n"]
+ ```
+
For more details about stream API, see `Exile.stream!/2` and `Exile.stream/2`.
For more details about inner working, please check `Exile.Process` documentation.
## Rationale
Existing approaches
#### Port
Port is the default way of executing external commands. This is okay when you have control over the external program's implementation and the interaction is minimal. Port has several important issues.
* it can end up creating [zombie process](https://hexdocs.pm/elixir/Port.html#module-zombie-operating-system-processes)
* cannot selectively close stdin. This is required when the external programs act on EOF from stdin
* it sends command output as a message to the beam process. This does not put back pressure on the external program and leads exhausting VM memory
#### Middleware based solutions
Libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. solves the first two issues associated with ports - zombie process and selectively closing STDIN. But not the third issue - having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
* no back-pressure
* additional os process (middleware) for every execution of your program
* in few cases such as porcelain user has to install this external program explicitly
* might not be suitable when the program requires constant communication between beam process and external program
On the plus side, unlike Exile, bugs in the implementation does not bring down whole beam VM.
#### [ExCmd](https://github.com/akash-akya/ex_cmd)
This is my other stab at solving back pressure on the external program issue. It implements a demand-driven protocol using [odu](https://github.com/akash-akya/odu) to solve this. Since ExCmd is also a port based solution, concerns previously mentioned applies to ExCmd too.
## Exile
Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication instead does raw stdio using NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Makes use of dirty-schedulers for IO.
**Highlights**
* Back pressure
* no middleware program
* no additional os process. No performance/resource cost
* no need to install any external command
* tries to handle zombie process by attempting to clean up external process. *But* as there is no middleware involved with exile, so it is still possible to endup with zombie process if program misbehave.
* stream abstraction
* selectively consume stdout and stderr streams
If you are running executing huge number of external programs **concurrently** (more than few hundred) you might have to increase open file descriptors limit (`ulimit -n`)
Non-blocking io can be used for other interesting things. Such as reading named pipe (FIFO) files. `Exile.stream!(~w(cat data.pipe))` does not block schedulers, so you can open hundreds of fifo files unlike default `file` based io.
#### TODO
* add benchmarks results
### 🚨 Obligatory NIF warning
As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls. Also, spawned external processes are still completely isolated at OS level.
If all you want is to run a command with no communication, then just sticking with `System.cmd` is a better.
### License
Copyright (c) 2020 Akash Hiremath.
Exile source code is released under Apache License 2.0. Check [LICENSE](LICENSE.md) for more information.
diff --git a/c_src/spawner.c b/c_src/spawner.c
index f9377ef..0c3935d 100644
--- a/c_src/spawner.c
+++ b/c_src/spawner.c
@@ -1,233 +1,237 @@
#include <fcntl.h>
#include <signal.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
// #define DEBUG
#ifdef DEBUG
#define debug(...) \
do { \
fprintf(stderr, "%s:%d\t(fn \"%s\") - ", __FILE__, __LINE__, __func__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
} while (0)
#else
#define debug(...)
#endif
#define error(...) \
do { \
fprintf(stderr, "%s:%d\t(fn: \"%s\") - ", __FILE__, __LINE__, __func__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
} while (0)
static const int PIPE_READ = 0;
static const int PIPE_WRITE = 1;
/* We are choosing an exit code which is not reserved see:
* https://www.tldp.org/LDP/abs/html/exitcodes.html. */
static const int FORK_EXEC_FAILURE = 125;
static int set_flag(int fd, int flags) {
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
}
static int send_io_fds(int socket, int stdin_fd, int stdout_fd, int stderr_fd) {
struct msghdr msg = {0};
struct cmsghdr *cmsg;
int fds[3];
char buf[CMSG_SPACE(3 * sizeof(int))];
char dup[256];
struct iovec io;
memset(buf, '\0', sizeof(buf));
io.iov_base = &dup;
io.iov_len = sizeof(dup);
msg.msg_iov = &io;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(3 * sizeof(int));
fds[0] = stdin_fd;
fds[1] = stdout_fd;
fds[2] = stderr_fd;
debug("stdout: %d, stderr: %d", stdout_fd, stderr_fd);
memcpy((int *)CMSG_DATA(cmsg), fds, 3 * sizeof(int));
if (sendmsg(socket, &msg, 0) < 0) {
debug("Failed to send message");
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
static void close_pipes(int pipes[3][2]) {
for (int i = 0; i < 3; i++) {
if (pipes[i][PIPE_READ] > 0)
close(pipes[i][PIPE_READ]);
if (pipes[i][PIPE_WRITE] > 0)
close(pipes[i][PIPE_WRITE]);
}
}
static int exec_process(char const *bin, char *const *args, int socket,
- bool enable_stderr) {
+ char const *stderr_str) {
int pipes[3][2] = {{0, 0}, {0, 0}, {0, 0}};
int r_cmdin, w_cmdin, r_cmdout, w_cmdout, r_cmderr, w_cmderr;
int i;
if (pipe(pipes[STDIN_FILENO]) == -1 || pipe(pipes[STDOUT_FILENO]) == -1 ||
pipe(pipes[STDERR_FILENO]) == -1) {
perror("[spawner] failed to create pipes");
close_pipes(pipes);
return 1;
}
debug("created pipes");
r_cmdin = pipes[STDIN_FILENO][PIPE_READ];
w_cmdin = pipes[STDIN_FILENO][PIPE_WRITE];
r_cmdout = pipes[STDOUT_FILENO][PIPE_READ];
w_cmdout = pipes[STDOUT_FILENO][PIPE_WRITE];
r_cmderr = pipes[STDERR_FILENO][PIPE_READ];
w_cmderr = pipes[STDERR_FILENO][PIPE_WRITE];
if (set_flag(r_cmdin, O_CLOEXEC) < 0 || set_flag(w_cmdout, O_CLOEXEC) < 0 ||
set_flag(w_cmderr, O_CLOEXEC) < 0 ||
set_flag(w_cmdin, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmdout, O_CLOEXEC | O_NONBLOCK) < 0 ||
set_flag(r_cmderr, O_CLOEXEC | O_NONBLOCK) < 0) {
perror("[spawner] failed to set flags for pipes");
close_pipes(pipes);
return 1;
}
debug("set fd flags to pipes");
if (send_io_fds(socket, w_cmdin, r_cmdout, r_cmderr) != EXIT_SUCCESS) {
perror("[spawner] failed to send fd via socket");
close_pipes(pipes);
return 1;
}
debug("sent fds over UDS");
close(STDIN_FILENO);
close(w_cmdin);
if (dup2(r_cmdin, STDIN_FILENO) < 0) {
perror("[spawner] failed to dup to stdin");
_exit(FORK_EXEC_FAILURE);
}
close(STDOUT_FILENO);
close(r_cmdout);
if (dup2(w_cmdout, STDOUT_FILENO) < 0) {
perror("[spawner] failed to dup to stdout");
_exit(FORK_EXEC_FAILURE);
}
- if (enable_stderr) {
+ if (strcmp(stderr_str, "consume") == 0) {
+ debug("== %d", strcmp(stderr_str, "consume"));
close(STDERR_FILENO);
close(r_cmderr);
if (dup2(w_cmderr, STDERR_FILENO) < 0) {
perror("[spawner] failed to dup to stderr");
_exit(FORK_EXEC_FAILURE);
}
+ } else if (strcmp(stderr_str, "disable") == 0) {
+ close(STDERR_FILENO);
+ close(r_cmderr);
+ close(w_cmderr);
+
+ int null_fd = open("/dev/null", O_WRONLY);
+ if (dup2(null_fd, STDERR_FILENO) < 0) {
+ perror("[spawner] failed to dup to stderr");
+ _exit(FORK_EXEC_FAILURE);
+ }
} else {
close(r_cmderr);
close(w_cmderr);
}
/* Close all non-standard io fds. Not closing STDERR */
for (i = STDERR_FILENO + 1; i < sysconf(_SC_OPEN_MAX); i++)
close(i);
debug("exec %s", bin);
execvp(bin, args);
perror("[spawner] execvp(): failed");
_exit(FORK_EXEC_FAILURE);
}
-static int spawn(const char *socket_path, const char *enable_stderr_str,
+static int spawn(const char *socket_path, const char *stderr_str,
const char *bin, char *const *args) {
int socket_fd;
struct sockaddr_un socket_addr;
- bool enable_stderr;
-
- if (strcmp(enable_stderr_str, "true") == 0) {
- enable_stderr = true;
- } else {
- enable_stderr = false;
- }
socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (socket_fd == -1) {
debug("Failed to create socket");
return EXIT_FAILURE;
}
debug("created domain socket");
memset(&socket_addr, 0, sizeof(struct sockaddr_un));
socket_addr.sun_family = AF_UNIX;
strncpy(socket_addr.sun_path, socket_path, sizeof(socket_addr.sun_path) - 1);
if (connect(socket_fd, (struct sockaddr *)&socket_addr,
sizeof(struct sockaddr_un)) == -1) {
debug("Failed to connect to socket");
return EXIT_FAILURE;
}
debug("connected to exile");
- if (exec_process(bin, args, socket_fd, enable_stderr) != 0)
+ if (exec_process(bin, args, socket_fd, stderr_str) != 0)
return EXIT_FAILURE;
// we should never reach here
return EXIT_SUCCESS;
}
int main(int argc, const char *argv[]) {
int status, i;
const char **exec_argv;
if (argc < 4) {
debug("expected at least 3 arguments, passed %d", argc);
status = EXIT_FAILURE;
} else {
exec_argv = malloc((argc - 3 + 1) * sizeof(char *));
for (i = 3; i < argc; i++)
exec_argv[i - 3] = argv[i];
exec_argv[i - 3] = NULL;
- debug("socket path: %s enable_stderr: %s bin: %s", argv[1], argv[2], argv[3]);
+ debug("socket path: %s stderr: %s bin: %s", argv[1], argv[2], argv[3]);
status = spawn(argv[1], argv[2], argv[3], (char *const *)exec_argv);
}
exit(status);
}
diff --git a/lib/exile.ex b/lib/exile.ex
index 30aa502..83422d5 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,277 +1,288 @@
defmodule Exile do
@moduledoc ~S"""
Exile is an alternative for beam [ports](https://hexdocs.pm/elixir/Port.html)
with back-pressure and non-blocking IO.
### Quick Start
Run a command and read from stdout
```
iex> Exile.stream!(~w(echo Hello))
...> |> Enum.into("") # collect as string
"Hello\n"
```
Run a command with list of strings as input
```
iex> Exile.stream!(~w(cat), input: ["Hello", " ", "World"])
...> |> Enum.into("") # collect as string
"Hello World"
```
Run a command with input as Stream
```
iex> input_stream = Stream.map(1..10, fn num -> "#{num} " end)
iex> Exile.stream!(~w(cat), input: input_stream)
...> |> Enum.into("")
"1 2 3 4 5 6 7 8 9 10 "
```
Run a command with input as infinite stream
```
# create infinite stream
iex> input_stream = Stream.repeatedly(fn -> "A" end)
iex> binary =
...> Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true) # we need to ignore epipe since we are terminating the program before the input completes
...> |> Stream.take(2) # we must limit since the input stream is infinite
...> |> Enum.into("")
iex> is_binary(binary)
true
iex> "AAAAA" <> _ = binary
```
Run a command with input Collectable
```
# Exile calls the callback with a sink where the process can push the data
iex> Exile.stream!(~w(cat), input: fn sink ->
...> Stream.map(1..10, fn num -> "#{num} " end)
...> |> Stream.into(sink) # push to the external process
...> |> Stream.run()
...> end)
...> |> Stream.take(100) # we must limit since the input stream is infinite
...> |> Enum.into("")
"1 2 3 4 5 6 7 8 9 10 "
```
When the command wait for the input stream to close
```
# base64 command wait for the input to close and writes data to stdout at once
iex> Exile.stream!(~w(base64), input: ["abcdef"])
...> |> Enum.into("")
"YWJjZGVm\n"
```
`stream!/2` raises non-zero exit as error
```
iex> Exile.stream!(["sh", "-c", "echo 'foo' && exit 10"])
...> |> Enum.to_list()
** (Exile.Stream.AbnormalExit) program exited with exit status: 10
```
`stream/2` variant returns exit status as last element
```
iex> Exile.stream(["sh", "-c", "echo 'foo' && exit 10"])
...> |> Enum.to_list()
[
"foo\n",
{:exit, {:status, 10}} # returns exit status of the program as last element
]
```
You can fetch exit_status from the error for `stream!/2`
```
iex> try do
...> Exile.stream!(["sh", "-c", "exit 10"])
...> |> Enum.to_list()
...> rescue
...> e in Exile.Stream.AbnormalExit ->
...> e.exit_status
...> end
10
```
With `max_chunk_size` set
```
iex> data =
...> Exile.stream!(~w(cat /dev/urandom), max_chunk_size: 100, ignore_epipe: true)
...> |> Stream.take(5)
...> |> Enum.into("")
iex> byte_size(data)
500
```
When input and output run at different rate
```
iex> input_stream = Stream.map(1..1000, fn num -> "X #{num} X\n" end)
iex> Exile.stream!(~w(grep 250), input: input_stream)
...> |> Enum.into("")
"X 250 X\n"
```
- With stderr enabled
+ With stderr set to :consume
```
- iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], enable_stderr: true)
+ iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], stderr: :consume)
...> |> Enum.to_list()
[{:stdout, "foo\n"}, {:stderr, "bar\n"}]
```
+ With stderr set to :disable
+
+ ```
+ iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], stderr: :disable)
+ ...> |> Enum.to_list()
+ ["foo\n"]
+ ```
+
For more details about stream API, see `Exile.stream!/2` and `Exile.stream/2`.
For more details about inner working, please check `Exile.Process`
documentation.
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
# We use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
@doc ~S"""
Runs the command with arguments and return an the stdout as lazily
Enumerable stream, similar to [`Stream`](https://hexdocs.pm/elixir/Stream.html).
First parameter must be a list containing command with arguments.
Example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* Enumerable:
```
# List
Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
# Stream
Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65_536)) |> Enum.to_list()
```
* Collectable:
If the input in a function with arity 1, Exile will call that function
with a `Collectable` as the argument. The function must *push* input to this
collectable. Return value of the function is ignored.
```
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
By defaults no input is sent to the command.
* `exit_timeout` - Duration to wait for external program to exit after completion
(when stream ends). Defaults to `:infinity`
* `max_chunk_size` - Maximum size of iodata chunk emitted by the stream.
Chunk size can be less than the `max_chunk_size` depending on the amount of
data available to be read. Defaults to `65_535`
- * `enable_stderr` - When set to true, output stream will contain stderr data along
- with stdout. Stream data will be of the form `{:stdout, iodata}` or `{:stderr, iodata}`
- to differentiate different streams. Defaults to false. See example below
+ * `stderr` - different ways to handle stderr stream. possible values `:console`, `:disable`, `:stream`.
+ 1. `:console` - stderr output is redirected to console (Default)
+ 2. `:disable` - stderr output is redirected `/dev/null` suppressing all output
+ 3. `:consume` - connects stderr for the consumption. The output stream will contain stderr
+ data along with stdout. Stream data will be either `{:stdout, iodata}` or `{:stderr, iodata}`
+ to differentiate different streams. See example below.
* `ignore_epipe` - When set to true, reader can exit early without raising error.
Typically writer gets `EPIPE` error on write when program terminate prematurely.
With `ignore_epipe` set to true this error will be ignored. This can be used to
match UNIX shell default behaviour. EPIPE is the error raised when the reader finishes
the reading and close output pipe before command completes. Defaults to `false`.
Remaining options are passed to `Exile.Process.start_link/2`
If program exits with non-zero exit status or :epipe then `Exile.Stream.AbnormalExit`
error will be raised with `exit_status` field set.
### Examples
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
Stream with stderr
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1),
input: File.stream!("music_video.mkv", [], 65_535),
- enable_stderr: true
+ stderr: :consume
)
|> Stream.transform(
fn ->
File.open!("music.mp3", [:write, :binary])
end,
fn elem, file ->
case elem do
{:stdout, data} ->
# write stdout data to a file
:ok = IO.binwrite(file, data)
{:stderr, msg} ->
# write stderr output to console
:ok = IO.write(msg)
end
{[], file}
end,
fn file ->
:ok = File.close(file)
end
)
|> Stream.run()
```
"""
@type collectable_func() :: (Collectable.t() -> any())
@spec stream!(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
- enable_stderr: boolean(),
+ stderr: :console | :disable | :consume,
ignore_epipe: boolean(),
max_chunk_size: pos_integer()
) :: Exile.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, Keyword.put(opts, :stream_exit_status, false))
end
@doc ~S"""
Same as `Exile.stream!/2` but the program exit status is passed as last
element of the stream.
The last element will be of the form `{:exit, term()}`. `term` will be a
positive integer in case of normal exit and `:epipe` in case of epipe error
See `Exile.stream!/2` documentation for details about the options and
examples.
"""
@spec stream(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
- enable_stderr: boolean(),
+ stderr: :console | :disable | :consume,
ignore_epipe: boolean(),
max_chunk_size: pos_integer()
) :: Exile.Stream.t()
def stream(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, Keyword.put(opts, :stream_exit_status, true))
end
end
diff --git a/lib/exile/process.ex b/lib/exile/process.ex
index 3ac80cf..80ebae2 100644
--- a/lib/exile/process.ex
+++ b/lib/exile/process.ex
@@ -1,861 +1,864 @@
defmodule Exile.Process do
@moduledoc ~S"""
GenServer which wraps spawned external command.
Use `Exile.stream!/1` over using this. Use this only if you are
familiar with life-cycle and need more control of the IO streams
and OS process.
## Comparison with Port
* it is demand driven. User explicitly has to `read` the command
output, and the progress of the external command is controlled
using OS pipes. Exile never load more output than we can consume,
so we should never experience memory issues
* it can close stdin while consuming output
* tries to handle zombie process by attempting to cleanup
external process. Note that there is no middleware involved
with exile so it is still possible to endup with zombie process.
* selectively consume stdout and stderr
Internally Exile uses non-blocking asynchronous system calls
to interact with the external process. It does not use port's
message based communication, instead uses raw stdio and NIF.
Uses asynchronous system calls for IO. Most of the system
calls are non-blocking, so it should not block the beam
schedulers. Make use of dirty-schedulers for IO
## Introduction
`Exile.Process` is a process based wrapper around the external
process. It is similar to `port` as an entity but the interface is
different. All communication with the external process must happen
via `Exile.Process` interface.
Exile process life-cycle tied to external process and owners. All
system resources such are open file-descriptors, external process
are cleaned up when the `Exile.Process` dies.
### Owner
Each `Exile.Process` has an owner. And it will be the process which
created it (via `Exile.Process.start_link/2`). Process owner can not
be changed.
Owner process will be linked to the `Exile.Process`. So when the
exile process is dies abnormally the owner will be killed too or
visa-versa. Owner process should avoid trapping the exit signal, if
you want avoid the caller getting killed, create a separate process
as owner to run the command and monitor that process.
Only owner can get the exit status of the command, using
`Exile.Process.await_exit/2`. All exile processes **MUST** be
awaited. Exit status or reason is **ALWAYS** sent to the owner. It
is similar to [`Task`](https://hexdocs.pm/elixir/Task.html). If the
owner exit without `await_exit`, the exile process will be killed,
but if the owner continue without `await_exit` then the exile
process will linger around till the process exit.
```
iex> alias Exile.Process
iex> {:ok, p} = Process.start_link(~w(echo hello))
iex> Process.read(p, 100)
{:ok, "hello\n"}
iex> Process.read(p, 100) # read till we get :eof
:eof
iex> Process.await_exit(p)
{:ok, 0}
```
### Pipe & Pipe Owner
Standard IO pipes/channels/streams of the external process such as
STDIN, STDOUT, STDERR are called as Pipes. User can either write or
read data from pipes.
Each pipe has an owner process and only that process can write or
read from the exile process. By default the process who created the
exile process is the owner of all the pipes. Pipe owner can be
changed using `Exile.Process.change_pipe_owner/3`.
Pipe owner is monitored and the pipes are closed automatically when
the pipe owner exit. Pipe Owner can close the pipe early using
`Exile.Process.close_stdin/1` etc.
`Exile.Process.await_exit/2` closes all of the caller owned pipes by
default.
```
iex> {:ok, p} = Process.start_link(~w(cat))
iex> writer = Task.async(fn ->
...> :ok = Process.change_pipe_owner(p, :stdin, self())
...> Process.write(p, "Hello World")
...> end)
iex> Task.await(writer)
:ok
iex> Process.read(p, 100)
{:ok, "Hello World"}
iex> Process.await_exit(p)
{:ok, 0}
```
### Pipe Operations
Pipe owner can read or write date to the owned pipe. `:stderr` by
- default is disabled, data written to stderr will appear on the
- console. You can enable reading stderr by passing `enable_stderr:
- true` during process creation.
+ default is connected to console, data written to stderr will appear on
+ the console. You can enable reading stderr by passing `stderr: :consume`
+ during process creation.
Special function `Exile.Process.read_any/2` can be used to read
from either stdout or stderr whichever has the data available.
All Pipe operations blocks the caller to have blocking as natural
back-pressure and to make the API simple. This is an important
feature of Exile, that is the ability to block caller when the stdio
buffer is full, exactly similar to how programs works on the shell
with pipes between then `cat larg-file | grep "foo"`. Internally it
does not block the Exile process or VM (which is typically the case
with NIF calls). Because of this user can make concurrent read,
write to different pipes from separate processes. Internally Exile
uses asynchronous IO APIs to avoid blocking of VM or VM process.
Reading from stderr
```
# write "Hello" to stdout and "World" to stderr
iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n")
- iex> {:ok, p} = Process.start_link(["sh", "-c", script], enable_stderr: true)
+ iex> {:ok, p} = Process.start_link(["sh", "-c", script], stderr: :consume)
iex> Process.read(p, 100)
{:ok, "Hello\n"}
iex> Process.read_stderr(p, 100)
{:ok, "World\n"}
iex> Process.await_exit(p)
{:ok, 0}
```
Reading using `read_any`
```
# write "Hello" to stdout and "World" to stderr
iex> script = Enum.join(["echo Hello", "echo World >&2"], "\n")
- iex> {:ok, p} = Process.start_link(["sh", "-c", script], enable_stderr: true)
+ iex> {:ok, p} = Process.start_link(["sh", "-c", script], stderr: :consume)
iex> Process.read_any(p)
{:ok, {:stdout, "Hello\n"}}
iex> Process.read_any(p)
{:ok, {:stderr, "World\n"}}
iex> Process.await_exit(p)
{:ok, 0}
```
### Process Termination
When owner does (normally or abnormally) the Exile process always
terminated irrespective of pipe status or process status. External
process get a chance to terminate gracefully, if that fail it will
be killed.
If owner calls `await_exit` then the owner owned pipes are closed
and we wait for external process to terminate, if the process
already terminated then call returns immediately with exit
status. Else command will be attempted to stop gracefully following
the exit sequence based on the timeout value (5s by default).
If owner calls `await_exit` with `timeout` as `:infinity` then
Exile does not attempt to forcefully stop the external command and
wait for command to exit on itself. The `await_exit` call can be blocked
indefinitely waiting for external process to terminate.
If external process exit on its own, exit status is collected and
Exile process will wait for owner to close pipes. Most commands exit
with pipes are closed, so just ensuring to close pipes when works is
done should be enough.
Example of process getting terminated by `SIGTERM` signal
```
# sleep command does not watch for stdin or stdout, so closing the
# pipe does not terminate the sleep command.
iex> {:ok, p} = Process.start_link(~w(sleep 100000000)) # sleep indefinitely
iex> Process.await_exit(p, 100) # ensure `await_exit` finish within `100ms`. By default it waits for 5s
{:ok, 143} # 143 is the exit status when command exit due to SIGTERM
```
## Examples
Run a command without any input or output
```
iex> {:ok, p} = Process.start_link(["sh", "-c", "exit 1"])
iex> Process.await_exit(p)
{:ok, 1}
```
Single process reading and writing to the command
```
# bc is a calculator, which reads from stdin and writes output to stdout
iex> {:ok, p} = Process.start_link(~w(bc))
iex> Process.write(p, "1 + 1\n") # there must be new-line to indicate the end of the input line
:ok
iex> Process.read(p)
{:ok, "2\n"}
iex> Process.write(p, "2 * 10 + 1\n")
:ok
iex> Process.read(p)
{:ok, "21\n"}
# We must close stdin to signal the `bc` command that we are done.
# since `await_exit` implicitly closes the pipes, in this case we don't have to
iex> Process.await_exit(p)
{:ok, 0}
```
Running a command which flush the output on stdin close. This is not
supported by Erlang/Elixir ports.
```
# `base64` command reads all input and writes encoded output when stdin is closed.
iex> {:ok, p} = Process.start_link(~w(base64))
iex> Process.write(p, "abcdef")
:ok
iex> Process.close_stdin(p) # we can selectively close stdin and read all output
:ok
iex> Process.read(p)
{:ok, "YWJjZGVm\n"}
iex> Process.read(p) # typically it is better to read till we receive :eof when we are not sure how big the output data size is
:eof
iex> Process.await_exit(p)
{:ok, 0}
```
Read and write to pipes in separate processes
```
iex> {:ok, p} = Process.start_link(~w(cat))
iex> writer = Task.async(fn ->
...> :ok = Process.change_pipe_owner(p, :stdin, self())
...> Process.write(p, "Hello World")
...> # no need to close the pipe explicitly here. Pipe will be closed automatically when process exit
...> end)
iex> reader = Task.async(fn ->
...> :ok = Process.change_pipe_owner(p, :stdout, self())
...> Process.read(p)
...> end)
iex> :timer.sleep(500) # wait for the reader and writer to change pipe owner, otherwise `await_exit` will close the pipes before we change pipe owner
iex> Process.await_exit(p, :infinity) # let the reader and writer take indefinite time to finish
{:ok, 0}
iex> Task.await(writer)
:ok
iex> Task.await(reader)
{:ok, "Hello World"}
```
"""
use GenServer
alias Exile.Process.Exec
alias Exile.Process.Nif
alias Exile.Process.Operations
alias Exile.Process.Pipe
alias Exile.Process.State
require Logger
defmodule Error do
defexception [:message]
end
@type pipe_name :: :stdin | :stdout | :stderr
@type t :: %__MODULE__{
monitor_ref: reference(),
exit_ref: reference(),
pid: pid | nil,
owner: pid
}
defstruct [:monitor_ref, :exit_ref, :pid, :owner]
@type exit_status :: non_neg_integer
- @default_opts [env: [], enable_stderr: false]
+ @default_opts [env: [], stderr: :console]
@default_buffer_size 65_535
@os_signal_timeout 1000
@doc """
Starts `Exile.Process` server.
Starts external program using `cmd_with_args` with options `opts`
`cmd_with_args` must be a list containing command with arguments.
example: `["cat", "file.txt"]`.
### Options
* `cd` - the directory to run the command in
* `env` - a list of tuples containing environment key-value.
These can be accessed in the external program
- * `enable_stderr` - when set to true, Exile connects stderr
- pipe for the consumption. Defaults to false. Note that when set
- to true stderr must be consumed to avoid external program from blocking.
+ * `stderr` - different ways to handle stderr stream.
+ possible values `:console`, `:disable`, `:stream`.
+ 1. `:console` - stderr output is redirected to console (Default)
+ 2. `:disable` - stderr output is redirected `/dev/null` suppressing all output
+ 3. `:consume` - connects stderr for the consumption. When set to stream the output must be consumed to
+ avoid external program from blocking.
Caller of the process will be the owner owner of the Exile Process.
And default owner of all opened pipes.
Please check module documentation for more details
"""
@spec start_link(nonempty_list(String.t()),
cd: String.t(),
env: [{String.t(), String.t()}],
- enable_stderr: boolean()
+ stderr: :console | :disable | :stream
) :: {:ok, t} | {:error, any()}
def start_link(cmd_with_args, opts \\ []) do
opts = Keyword.merge(@default_opts, opts)
case Exec.normalize_exec_args(cmd_with_args, opts) do
{:ok, args} ->
owner = self()
exit_ref = make_ref()
args = Map.merge(args, %{owner: owner, exit_ref: exit_ref})
{:ok, pid} = GenServer.start_link(__MODULE__, args)
ref = Process.monitor(pid)
process = %__MODULE__{
pid: pid,
monitor_ref: ref,
exit_ref: exit_ref,
owner: owner
}
{:ok, process}
error ->
error
end
end
@doc """
Closes external program's standard input pipe (stdin).
Only owner of the pipe can close the pipe. This call will return
immediately.
"""
@spec close_stdin(t) :: :ok | {:error, :pipe_closed_or_invalid_caller} | {:error, any()}
def close_stdin(process) do
GenServer.call(process.pid, {:close_pipe, :stdin}, :infinity)
end
@doc """
Closes external program's standard output pipe (stdout)
Only owner of the pipe can close the pipe. This call will return
immediately.
"""
@spec close_stdout(t) :: :ok | {:error, any()}
def close_stdout(process) do
GenServer.call(process.pid, {:close_pipe, :stdout}, :infinity)
end
@doc """
Closes external program's standard error pipe (stderr)
Only owner of the pipe can close the pipe. This call will return
immediately.
"""
@spec close_stderr(t) :: :ok | {:error, any()}
def close_stderr(process) do
GenServer.call(process.pid, {:close_pipe, :stderr}, :infinity)
end
@doc """
Writes iodata `data` to external program's standard input pipe.
This call blocks when the pipe is full. Returns `:ok` when
the complete data is written.
"""
@spec write(t, binary) :: :ok | {:error, any()}
def write(process, iodata) do
binary = IO.iodata_to_binary(iodata)
GenServer.call(process.pid, {:write_stdin, binary}, :infinity)
end
@doc """
Returns bytes from executed command's stdout with maximum size `max_size`.
Blocks if no data present in stdout pipe yet. And returns as soon as
data of any size is available.
Note that `max_size` is the maximum size of the returned data. But
the returned data can be less than that depending on how the program
flush the data etc.
"""
@spec read(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
def read(process, max_size \\ @default_buffer_size)
when is_integer(max_size) and max_size > 0 do
GenServer.call(process.pid, {:read_stdout, max_size}, :infinity)
end
@doc """
Returns bytes from executed command's stderr with maximum size `max_size`.
- Pipe must be enabled with `enable_stderr: true` to read the data.
+ Pipe must be enabled with `stderr: :consume` to read the data.
Blocks if no bytes are written to stderr yet. And returns as soon as
bytes are available
Note that `max_size` is the maximum size of the returned data. But
the returned data can be less than that depending on how the program
flush the data etc.
"""
@spec read_stderr(t, pos_integer()) :: {:ok, iodata} | :eof | {:error, any()}
def read_stderr(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
GenServer.call(process.pid, {:read_stderr, size}, :infinity)
end
@doc """
Returns bytes from either stdout or stderr with maximum size
`max_size` whichever is available at that time.
Blocks if no bytes are written to stdout or stderr yet. And returns
as soon as data is available.
Note that `max_size` is the maximum size of the returned data. But
the returned data can be less than that depending on how the program
flush the data etc.
"""
@spec read_any(t, pos_integer()) ::
{:ok, {:stdout, iodata}} | {:ok, {:stderr, iodata}} | :eof | {:error, any()}
def read_any(process, size \\ @default_buffer_size) when is_integer(size) and size > 0 do
GenServer.call(process.pid, {:read_stdout_or_stderr, size}, :infinity)
end
@doc """
Changes the Pipe owner of the pipe to specified pid.
Note that currently any process can change the pipe owner.
For more details about Pipe Owner, please check module docs.
"""
@spec change_pipe_owner(t, pipe_name, pid) :: :ok | {:error, any()}
def change_pipe_owner(process, pipe_name, target_owner_pid) do
GenServer.call(
process.pid,
{:change_pipe_owner, pipe_name, target_owner_pid},
:infinity
)
end
@doc """
Sends an system signal to external program
Note that `:sigkill` kills the program unconditionally.
Avoid sending signals manually, use `await_exit` instead.
"""
@spec kill(t, :sigkill | :sigterm) :: :ok
def kill(process, signal) when signal in [:sigkill, :sigterm] do
GenServer.call(process.pid, {:kill, signal}, :infinity)
end
@doc """
Wait for the program to terminate and get exit status.
**ONLY** the Process owner can call this function. And all Exile
**process MUST** be awaited (Similar to Task).
Exile first politely asks the program to terminate by closing the
pipes owned by the process owner (by default process owner is the
pipes owner). Most programs terminates when standard pipes are
closed.
If you have changed the pipe owner to other process, you have to
close pipe yourself or wait for the program to exit.
If the program fails to terminate within the timeout (default 5s)
then the program will be killed using the exit sequence by sending
`SIGTERM`, `SIGKILL` signals in sequence.
When timeout is set to `:infinity` `await_exit` wait for the
programs to terminate indefinitely.
For more details check module documentation.
"""
@spec await_exit(t, timeout :: timeout()) :: {:ok, exit_status}
def await_exit(process, timeout \\ 5000) do
%__MODULE__{
monitor_ref: monitor_ref,
exit_ref: exit_ref,
owner: owner,
pid: pid
} = process
if self() != owner do
raise ArgumentError,
"task #{inspect(process)} exit status can only be queried by owner but was queried from #{inspect(self())}"
end
graceful_exit_timeout =
if timeout == :infinity do
:infinity
else
# process exit steps should finish before receive timeout exceeds
# receive timeout is max allowed time for the `await_exit` call to block
max(0, timeout - 100)
end
:ok = GenServer.cast(pid, {:prepare_exit, owner, graceful_exit_timeout})
receive do
{^exit_ref, exit_status} ->
Process.demonitor(monitor_ref, [:flush])
exit_status
{:DOWN, ^monitor_ref, _, _proc, reason} ->
exit({reason, {__MODULE__, :await_exit, [process, timeout]}})
after
# ideally we should never this this case since the process must
# be terminated before the timeout and we should have received
# `DOWN` message
timeout ->
exit({:timeout, {__MODULE__, :await_exit, [process, timeout]}})
end
end
@doc """
Returns OS pid of the command
This is meant only for debugging. Avoid interacting with the
external process directly
"""
@spec os_pid(t) :: pos_integer()
def os_pid(process) do
GenServer.call(process.pid, :os_pid, :infinity)
end
## Server
@impl true
def init(args) do
- {enable_stderr, args} = Map.pop(args, :enable_stderr)
+ {stderr, args} = Map.pop(args, :stderr)
{owner, args} = Map.pop!(args, :owner)
{exit_ref, args} = Map.pop!(args, :exit_ref)
state = %State{
args: args,
owner: owner,
status: :init,
- enable_stderr: enable_stderr,
+ stderr: stderr,
operations: Operations.new(),
exit_ref: exit_ref,
monitor_ref: Process.monitor(owner)
}
{:ok, state, {:continue, nil}}
end
@impl true
def handle_continue(nil, state) do
{:noreply, exec(state)}
end
@impl true
def handle_cast({:prepare_exit, caller, timeout}, state) do
state =
Enum.reduce(state.pipes, state, fn {_pipe_name, pipe}, state ->
case Pipe.close(pipe, caller) do
{:ok, pipe} ->
{:ok, state} = State.put_pipe(state, pipe.name, pipe)
state
{:error, _} ->
state
end
end)
case maybe_shutdown(state) do
{:stop, :normal, state} ->
{:stop, :normal, state}
{:noreply, state} ->
if timeout == :infinity do
{:noreply, state}
else
total_stages = 3
stage_timeout = div(timeout, total_stages)
handle_info({:prepare_exit, :normal_exit, stage_timeout}, state)
end
end
end
@impl true
def handle_call({:change_pipe_owner, pipe_name, new_owner}, _from, state) do
with {:ok, pipe} <- State.pipe(state, pipe_name),
{:ok, new_pipe} <- Pipe.set_owner(pipe, new_owner),
{:ok, state} <- State.put_pipe(state, pipe_name, new_pipe) do
{:reply, :ok, state}
else
{:error, _} = error ->
{:reply, error, state}
end
end
def handle_call({:close_pipe, pipe_name}, {caller, _} = from, state) do
with {:ok, pipe} <- State.pipe(state, pipe_name),
{:ok, new_pipe} <- Pipe.close(pipe, caller),
:ok <- GenServer.reply(from, :ok),
{:ok, new_state} <- State.put_pipe(state, pipe_name, new_pipe) do
maybe_shutdown(new_state)
else
{:error, _} = ret ->
{:reply, ret, state}
end
end
def handle_call({:read_stdout, size}, from, state) do
case Operations.read(state, {:read_stdout, from, size}) do
{:noreply, state} ->
{:noreply, state}
ret ->
{:reply, ret, state}
end
end
def handle_call({:read_stderr, size}, from, state) do
case Operations.read(state, {:read_stderr, from, size}) do
{:noreply, state} ->
{:noreply, state}
ret ->
{:reply, ret, state}
end
end
def handle_call({:read_stdout_or_stderr, size}, from, state) do
case Operations.read_any(state, {:read_stdout_or_stderr, from, size}) do
{:noreply, state} ->
{:noreply, state}
ret ->
{:reply, ret, state}
end
end
def handle_call({:write_stdin, binary}, from, state) do
case Operations.write(state, {:write_stdin, from, binary}) do
{:noreply, state} ->
{:noreply, state}
ret ->
{:reply, ret, state}
end
end
def handle_call(:os_pid, _from, state) do
case Port.info(state.port, :os_pid) do
{:os_pid, os_pid} ->
{:reply, {:ok, os_pid}, state}
nil ->
Logger.debug("Process not alive")
{:reply, :undefined, state}
end
end
def handle_call({:kill, signal}, _from, state) do
{:reply, signal(state.port, signal), state}
end
@impl true
def handle_info({:prepare_exit, current_stage, timeout}, %{status: status, port: port} = state) do
cond do
status != :running ->
{:noreply, state}
current_stage == :normal_exit ->
Elixir.Process.send_after(self(), {:prepare_exit, :sigterm, timeout}, timeout)
{:noreply, state}
current_stage == :sigterm ->
signal(port, :sigterm)
Elixir.Process.send_after(self(), {:prepare_exit, :sigkill, timeout}, timeout)
{:noreply, state}
current_stage == :sigkill ->
signal(port, :sigkill)
Elixir.Process.send_after(self(), {:prepare_exit, :stop, timeout}, timeout)
{:noreply, state}
# this should never happen, since sigkill signal can not be ignored by the OS process
current_stage == :stop ->
{:stop, :sigkill_timeout, state}
end
end
def handle_info({:select, write_resource, _ref, :ready_output}, state) do
:stdin = State.pipe_name_for_fd(state, write_resource)
with {:ok, {:write_stdin, from, _bin} = operation, state} <-
State.pop_operation(state, :write_stdin) do
case Operations.write(state, operation) do
{:noreply, state} ->
{:noreply, state}
ret ->
GenServer.reply(from, ret)
{:noreply, state}
end
end
end
def handle_info({:select, read_resource, _ref, :ready_input}, state) do
pipe_name = State.pipe_name_for_fd(state, read_resource)
with {:ok, operation_name} <- Operations.match_pending_operation(state, pipe_name),
{:ok, {_, from, _} = operation, state} <- State.pop_operation(state, operation_name) do
ret =
case operation_name do
:read_stdout_or_stderr ->
Operations.read_any(state, operation)
name when name in [:read_stdout, :read_stderr] ->
Operations.read(state, operation)
end
case ret do
{:noreply, state} ->
{:noreply, state}
ret ->
GenServer.reply(from, ret)
{:noreply, state}
end
else
{:error, _error} ->
{:noreply, state}
end
end
def handle_info({:stop, :sigterm}, state) do
if state.status == :running do
signal(state.port, :sigkill)
Elixir.Process.send_after(self(), {:stop, :sigkill}, @os_signal_timeout)
end
{:noreply, state}
end
def handle_info({:stop, :sigkill}, state) do
if state.status == :running do
# this should never happen, since sigkill signal can not be handled
{:stop, :sigkill_timeout, state}
else
{:noreply, state}
end
end
def handle_info({port, {:exit_status, exit_status}}, %{port: port} = state) do
send(state.owner, {state.exit_ref, {:ok, exit_status}})
state = State.set_status(state, {:exit, exit_status})
maybe_shutdown(state)
end
# we are only interested in Port exit signals
def handle_info({:EXIT, port, reason}, %State{port: port} = state) when reason != :normal do
send(state.owner, {state.exit_ref, {:error, reason}})
state = State.set_status(state, {:exit, {:error, reason}})
maybe_shutdown(state)
end
def handle_info({:EXIT, port, :normal}, %{port: port} = state) do
maybe_shutdown(state)
end
# shutdown unconditionally when process owner exit normally.
# Since Exile process is linked to the owner, in case of owner crash,
# exile process will be killed by the VM.
def handle_info(
{:DOWN, owner_ref, :process, _pid, reason},
%State{monitor_ref: owner_ref} = state
) do
{:stop, reason, state}
end
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
state =
Enum.reduce(state.pipes, state, fn {_pipe_name, pipe}, state ->
case Pipe.close(pipe, pid) do
{:ok, pipe} ->
{:ok, state} = State.put_pipe(state, pipe.name, pipe)
state
{:error, _} ->
state
end
end)
maybe_shutdown(state)
end
@type signal :: :sigkill | :sigterm
@spec signal(port, signal) :: :ok | {:error, :invalid_signal} | {:error, :process_not_alive}
defp signal(port, signal) do
with true <- signal in [:sigkill, :sigterm],
{:os_pid, os_pid} <- Port.info(port, :os_pid) do
Nif.nif_kill(os_pid, signal)
else
false ->
{:error, :invalid_signal}
nil ->
{:error, :process_not_alive}
end
end
@spec maybe_shutdown(State.t()) :: {:stop, :normal, State.t()} | {:noreply, State.t()}
defp maybe_shutdown(state) do
open_pipes_count =
state.pipes
|> Map.values()
|> Enum.count(&Pipe.open?/1)
if open_pipes_count == 0 && !(state.status in [:init, :running]) do
{:stop, :normal, state}
else
{:noreply, state}
end
end
@spec exec(State.t()) :: State.t()
defp exec(state) do
%{
port: port,
stdin: stdin_fd,
stdout: stdout_fd,
stderr: stderr_fd
- } = Exec.start(state.args, state.enable_stderr)
+ } = Exec.start(state.args, state.stderr)
stderr =
- if state.enable_stderr do
+ if state.stderr == :consume do
Pipe.new(:stderr, stderr_fd, state.owner)
else
Pipe.new(:stderr)
end
%State{
state
| port: port,
status: :running,
pipes: %{
stdin: Pipe.new(:stdin, stdin_fd, state.owner),
stdout: Pipe.new(:stdout, stdout_fd, state.owner),
stderr: stderr
}
}
end
end
diff --git a/lib/exile/process/exec.ex b/lib/exile/process/exec.ex
index be7468c..d5f0ceb 100644
--- a/lib/exile/process/exec.ex
+++ b/lib/exile/process/exec.ex
@@ -1,214 +1,221 @@
defmodule Exile.Process.Exec do
@moduledoc false
alias Exile.Process.Nif
alias Exile.Process.Pipe
@type args :: %{
cmd_with_args: [String.t()],
cd: String.t(),
env: [{String.t(), String.t()}]
}
@spec start(args, boolean()) :: %{
port: port,
stdin: non_neg_integer(),
stdout: non_neg_integer(),
stderr: non_neg_integer()
}
def start(
%{
cmd_with_args: cmd_with_args,
cd: cd,
env: env
},
- enable_stderr
+ stderr
) do
socket_path = socket_path()
{:ok, sock} = :socket.open(:local, :stream, :default)
try do
:ok = socket_bind(sock, socket_path)
:ok = :socket.listen(sock)
- spawner_cmdline_args = [socket_path, to_string(enable_stderr) | cmd_with_args]
+ spawner_cmdline_args = [socket_path, to_string(stderr) | cmd_with_args]
port_opts =
[:nouse_stdio, :exit_status, :binary, args: spawner_cmdline_args] ++
prune_nils(env: env, cd: cd)
port = Port.open({:spawn_executable, spawner_path()}, port_opts)
{:os_pid, os_pid} = Port.info(port, :os_pid)
Exile.Watcher.watch(self(), os_pid, socket_path)
- {stdin_fd, stdout_fd, stderr_fd} = receive_fds(sock, enable_stderr)
+ {stdin_fd, stdout_fd, stderr_fd} = receive_fds(sock, stderr)
%{port: port, stdin: stdin_fd, stdout: stdout_fd, stderr: stderr_fd}
after
:socket.close(sock)
File.rm!(socket_path)
end
end
@spec normalize_exec_args(nonempty_list(), keyword()) ::
- {:ok, %{cmd_with_args: nonempty_list(), cd: charlist, env: env, enable_stderr: boolean}}
+ {:ok,
+ %{
+ cmd_with_args: nonempty_list(),
+ cd: charlist,
+ env: env,
+ stderr: :console | :disable | :consume
+ }}
| {:error, String.t()}
def normalize_exec_args(cmd_with_args, opts) do
with {:ok, cmd} <- normalize_cmd(cmd_with_args),
{:ok, args} <- normalize_cmd_args(cmd_with_args),
:ok <- validate_opts_fields(opts),
{:ok, cd} <- normalize_cd(opts[:cd]),
- {:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
+ {:ok, stderr} <- normalize_stderr(opts[:stderr]),
{:ok, env} <- normalize_env(opts[:env]) do
- {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, enable_stderr: enable_stderr}}
+ {:ok, %{cmd_with_args: [cmd | args], cd: cd, env: env, stderr: stderr}}
end
end
@spec spawner_path :: String.t()
defp spawner_path do
:filename.join(:code.priv_dir(:exile), "spawner")
end
@socket_timeout 2000
@spec receive_fds(:socket.socket(), boolean) :: {Pipe.fd(), Pipe.fd(), Pipe.fd()}
- defp receive_fds(lsock, enable_stderr) do
+ defp receive_fds(lsock, stderr) do
{:ok, sock} = :socket.accept(lsock, @socket_timeout)
try do
{:ok, msg} = :socket.recvmsg(sock, @socket_timeout)
%{ctrl: [%{data: data, level: :socket, type: :rights}]} = msg
<<stdin_fd::native-32, stdout_fd::native-32, stderr_fd::native-32, _::binary>> = data
# FDs are managed by the NIF resource life-cycle
{:ok, stdout} = Nif.nif_create_fd(stdout_fd)
{:ok, stdin} = Nif.nif_create_fd(stdin_fd)
{:ok, stderr} =
- if enable_stderr do
+ if stderr == :consume do
Nif.nif_create_fd(stderr_fd)
else
{:ok, nil}
end
{stdin, stdout, stderr}
after
:socket.close(sock)
end
end
# skip type warning till we change min OTP version to 24.
@dialyzer {:nowarn_function, socket_bind: 2}
defp socket_bind(sock, path) do
case :socket.bind(sock, %{family: :local, path: path}) do
:ok -> :ok
# for compatibility with OTP version < 24
{:ok, _} -> :ok
other -> other
end
end
@spec socket_path() :: String.t()
defp socket_path do
str = :crypto.strong_rand_bytes(16) |> Base.url_encode64() |> binary_part(0, 16)
path = Path.join(System.tmp_dir!(), str)
_ = :file.delete(path)
path
end
@spec prune_nils(keyword()) :: keyword()
defp prune_nils(kv) do
Enum.reject(kv, fn {_, v} -> is_nil(v) end)
end
@spec normalize_cmd(nonempty_list()) :: {:ok, nonempty_list()} | {:error, binary()}
defp normalize_cmd(arg) do
case arg do
[cmd | _] when is_binary(cmd) ->
path = System.find_executable(cmd)
if path do
{:ok, to_charlist(path)}
else
{:error, "command not found: #{inspect(cmd)}"}
end
_ ->
{:error, "`cmd_with_args` must be a list of strings, Please check the documentation"}
end
end
defp normalize_cmd_args([_ | args]) do
if Enum.all?(args, &is_binary/1) do
{:ok, Enum.map(args, &to_charlist/1)}
else
{:error, "command arguments must be list of strings. #{inspect(args)}"}
end
end
@spec normalize_cd(binary) :: {:ok, charlist()} | {:error, String.t()}
defp normalize_cd(cd) do
case cd do
nil ->
{:ok, ''}
cd when is_binary(cd) ->
if File.exists?(cd) && File.dir?(cd) do
{:ok, to_charlist(cd)}
else
{:error, "`:cd` must be valid directory path"}
end
_ ->
{:error, "`:cd` must be a binary string"}
end
end
@type env :: list({String.t(), String.t()})
@spec normalize_env(env) :: {:ok, env} | {:error, String.t()}
defp normalize_env(env) do
case env do
nil ->
{:ok, []}
env when is_list(env) or is_map(env) ->
env =
Enum.map(env, fn {key, value} ->
{to_charlist(key), to_charlist(value)}
end)
{:ok, env}
_ ->
{:error, "`:env` must be a map or list of `{string, string}`"}
end
end
- @spec normalize_enable_stderr(enable_stderr :: boolean) :: {:ok, boolean} | {:error, String.t()}
- defp normalize_enable_stderr(enable_stderr) do
- case enable_stderr do
+ @spec normalize_stderr(stderr :: :console | :disable | :consume | nil) ::
+ {:ok, :console | :disable | :consume} | {:error, String.t()}
+ defp normalize_stderr(stderr) do
+ case stderr do
nil ->
- {:ok, false}
+ {:ok, :console}
- enable_stderr when is_boolean(enable_stderr) ->
- {:ok, enable_stderr}
+ stderr when stderr in [:console, :disable, :consume] ->
+ {:ok, stderr}
_ ->
- {:error, ":enable_stderr must be a boolean"}
+ {:error, ":stderr must be an atom and one of :console, :disable, :consume"}
end
end
@spec validate_opts_fields(keyword) :: :ok | {:error, String.t()}
defp validate_opts_fields(opts) do
- {_, additional_opts} = Keyword.split(opts, [:cd, :env, :enable_stderr])
+ {_, additional_opts} = Keyword.split(opts, [:cd, :env, :stderr])
if Enum.empty?(additional_opts) do
:ok
else
{:error, "invalid opts: #{inspect(additional_opts)}"}
end
end
end
diff --git a/lib/exile/process/state.ex b/lib/exile/process/state.ex
index 8775f8e..678c07f 100644
--- a/lib/exile/process/state.ex
+++ b/lib/exile/process/state.ex
@@ -1,99 +1,99 @@
defmodule Exile.Process.State do
@moduledoc false
alias Exile.Process.Exec
alias Exile.Process.Operations
alias Exile.Process.Pipe
alias __MODULE__
@type read_mode :: :stdout | :stderr | :stdout_or_stderr
@type pipes :: %{
stdin: Pipe.t(),
stdout: Pipe.t(),
stderr: Pipe.t()
}
@type status ::
:init
| :running
| {:exit, non_neg_integer()}
| {:exit, {:error, error :: term}}
@type t :: %__MODULE__{
args: Exec.args(),
owner: pid,
port: port(),
pipes: pipes,
status: status,
- enable_stderr: boolean(),
+ stderr: :console | :disable | :consume,
operations: Operations.t(),
exit_ref: reference(),
monitor_ref: reference()
}
defstruct [
:args,
:owner,
:port,
:pipes,
:status,
- :enable_stderr,
+ :stderr,
:operations,
:exit_ref,
:monitor_ref
]
alias __MODULE__
@spec pipe(State.t(), name :: Pipe.name()) :: {:ok, Pipe.t()} | {:error, :invalid_name}
def pipe(%State{} = state, name) do
if name in [:stdin, :stdout, :stderr] do
{:ok, Map.fetch!(state.pipes, name)}
else
{:error, :invalid_name}
end
end
@spec put_pipe(State.t(), name :: Pipe.name(), Pipe.t()) :: {:ok, t} | {:error, :invalid_name}
def put_pipe(%State{} = state, name, pipe) do
if name in [:stdin, :stdout, :stderr] do
pipes = Map.put(state.pipes, name, pipe)
state = %State{state | pipes: pipes}
{:ok, state}
else
{:error, :invalid_name}
end
end
@spec pipe_name_for_fd(State.t(), fd :: Pipe.fd()) :: Pipe.name()
def pipe_name_for_fd(state, fd) do
pipe =
state.pipes
|> Map.values()
|> Enum.find(&(&1.fd == fd))
pipe.name
end
@spec put_operation(State.t(), Operations.operation()) :: {:ok, t} | {:error, term}
def put_operation(%State{operations: ops} = state, operation) do
with {:ok, ops} <- Operations.put(ops, operation) do
{:ok, %State{state | operations: ops}}
end
end
@spec pop_operation(State.t(), Operations.name()) ::
{:ok, Operations.operation(), t} | {:error, term}
def pop_operation(%State{operations: ops} = state, name) do
with {:ok, operation, ops} <- Operations.pop(ops, name) do
{:ok, operation, %State{state | operations: ops}}
end
end
@spec set_status(State.t(), status) :: State.t()
def set_status(state, status) do
%State{state | status: status}
end
end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index 4ba734e..0a4d23e 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,352 +1,352 @@
defmodule Exile.Stream do
@moduledoc """
Defines a `Exile.Stream` struct returned by `Exile.stream!/2`.
"""
alias Exile.Process
alias Exile.Process.Error
require Logger
defmodule AbnormalExit do
defexception [:message, :exit_status]
@impl true
def exception(:epipe) do
msg = "program exited due to :epipe error"
%__MODULE__{message: msg, exit_status: :epipe}
end
def exception(exit_status) do
msg = "program exited with exit status: #{exit_status}"
%__MODULE__{message: msg, exit_status: exit_status}
end
end
defmodule Sink do
@moduledoc false
@type t :: %__MODULE__{process: Process.t(), ignore_epipe: boolean}
defstruct [:process, :ignore_epipe]
defimpl Collectable do
def into(%{process: process}) do
collector_fun = fn
:ok, {:cont, x} ->
case Process.write(process, x) do
{:error, :epipe} ->
# there is no other way to stop a Collectable than to
# raise error, we catch this error and return `{:error, :epipe}`
raise Error, "epipe"
:ok ->
:ok
end
acc, :done ->
acc
acc, :halt ->
acc
end
{:ok, collector_fun}
end
end
end
defstruct [:stream_opts, :process_opts, :cmd_with_args]
@typedoc "Struct members are private, do not depend on them"
@type t :: %__MODULE__{
stream_opts: map(),
process_opts: keyword(),
cmd_with_args: [String.t()]
}
@stream_opts [
:exit_timeout,
:max_chunk_size,
:input,
- :enable_stderr,
+ :stderr,
:ignore_epipe,
:stream_exit_status
]
@doc false
@spec __build__(nonempty_list(String.t()), keyword()) :: t()
def __build__(cmd_with_args, opts) do
{stream_opts, process_opts} = Keyword.split(opts, @stream_opts)
case normalize_stream_opts(stream_opts) do
{:ok, stream_opts} ->
%Exile.Stream{
stream_opts: stream_opts,
process_opts: process_opts,
cmd_with_args: cmd_with_args
}
{:error, error} ->
raise ArgumentError, message: error
end
end
defimpl Enumerable do
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def reduce(arg, acc, fun) do
start_fun = fn ->
state = start_process(arg)
{state, :running}
end
next_fun = fn
{state, :exited} ->
{:halt, {state, :exited}}
{state, exit_state} ->
%{
process: process,
stream_opts: %{
- enable_stderr: enable_stderr,
+ stderr: stderr,
stream_exit_status: stream_exit_status,
max_chunk_size: max_chunk_size
}
} = state
case Process.read_any(process, max_chunk_size) do
:eof when stream_exit_status == false ->
{:halt, {state, :eof}}
:eof when stream_exit_status == true ->
elem = [await_exit(state, :eof)]
{elem, {state, :exited}}
- {:ok, {:stdout, x}} when enable_stderr == false ->
+ {:ok, {:stdout, x}} when stderr != :consume ->
elem = [IO.iodata_to_binary(x)]
{elem, {state, exit_state}}
- {:ok, {io_stream, x}} when enable_stderr == true ->
+ {:ok, {io_stream, x}} when stderr == :consume ->
elem = [{io_stream, IO.iodata_to_binary(x)}]
{elem, {state, exit_state}}
{:error, errno} ->
raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
end
end
after_fun = fn
{_state, :exited} ->
:ok
{state, exit_state} ->
case await_exit(state, exit_state) do
{:exit, {:status, 0}} ->
:ok
{:exit, {:status, exit_status}} ->
raise AbnormalExit, exit_status
{:exit, :epipe} ->
raise AbnormalExit, :epipe
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
defp start_process(%Exile.Stream{
process_opts: process_opts,
stream_opts: stream_opts,
cmd_with_args: cmd_with_args
}) do
- process_opts = Keyword.put(process_opts, :enable_stderr, stream_opts[:enable_stderr])
+ process_opts = Keyword.put(process_opts, :stderr, stream_opts[:stderr])
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
sink = %Sink{process: process, ignore_epipe: stream_opts[:ignore_epipe]}
writer_task = start_input_streamer(sink, stream_opts.input)
%{process: process, stream_opts: stream_opts, writer_task: writer_task}
end
@doc false
@spec start_input_streamer(term, term) :: Task.t()
defp start_input_streamer(%Sink{process: process} = sink, input) do
case input do
:no_input ->
# use `Task.completed(:ok)` when bumping min Elixir requirement
Task.async(fn -> :ok end)
{:enumerable, enum} ->
Task.async(fn ->
Process.change_pipe_owner(process, :stdin, self())
try do
Enum.into(enum, sink)
rescue
Error ->
{:error, :epipe}
end
end)
{:collectable, func} ->
Task.async(fn ->
Process.change_pipe_owner(process, :stdin, self())
try do
func.(sink)
rescue
Error ->
{:error, :epipe}
end
end)
end
end
defp await_exit(state, exit_state) do
%{
process: process,
stream_opts: %{ignore_epipe: ignore_epipe, exit_timeout: exit_timeout},
writer_task: writer_task
} = state
result = Process.await_exit(process, exit_timeout)
writer_task_status = Task.await(writer_task)
case {exit_state, result, writer_task_status} do
# if reader exit early and there is a pending write
{:running, {:ok, _status}, {:error, :epipe}} when ignore_epipe ->
{:exit, {:status, 0}}
# if reader exit early and there is no pending write or if
# there is no writer
{:running, {:ok, _status}, :ok} when ignore_epipe ->
{:exit, {:status, 0}}
# if we get epipe from writer then raise that error, and ignore exit status
{:running, {:ok, _status}, {:error, :epipe}} when ignore_epipe == false ->
{:exit, :epipe}
# Normal exit success case
{_, {:ok, 0}, _} ->
{:exit, {:status, 0}}
{:eof, {:ok, exit_status}, _} ->
{:exit, {:status, exit_status}}
end
end
end
@spec normalize_input(term) ::
{:ok, :no_input} | {:ok, {:enumerable, term}} | {:ok, {:collectable, function}}
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}
!is_function(term, 1) && Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}
is_function(term, 1) ->
{:ok, {:collectable, term}}
true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end
defp normalize_max_chunk_size(max_chunk_size) do
case max_chunk_size do
nil ->
{:ok, 65_536}
max_chunk_size when is_integer(max_chunk_size) and max_chunk_size > 0 ->
{:ok, max_chunk_size}
_ ->
{:error, ":max_chunk_size must be a positive integer"}
end
end
defp normalize_exit_timeout(timeout) do
case timeout do
nil ->
{:ok, 5000}
timeout when is_integer(timeout) and timeout > 0 ->
{:ok, timeout}
_ ->
{:error, ":exit_timeout must be either :infinity or an integer"}
end
end
- defp normalize_enable_stderr(enable_stderr) do
- case enable_stderr do
+ defp normalize_stderr(stderr) do
+ case stderr do
nil ->
- {:ok, false}
+ {:ok, :console}
- enable_stderr when is_boolean(enable_stderr) ->
- {:ok, enable_stderr}
+ stderr when stderr in [:console, :disable, :consume] ->
+ {:ok, stderr}
_ ->
- {:error, ":enable_stderr must be a boolean"}
+ {:error, ":stderr must be an atom and one of :console, :disable, :consume"}
end
end
defp normalize_ignore_epipe(ignore_epipe) do
case ignore_epipe do
nil ->
{:ok, false}
ignore_epipe when is_boolean(ignore_epipe) ->
{:ok, ignore_epipe}
_ ->
{:error, ":ignore_epipe must be a boolean"}
end
end
defp normalize_stream_exit_status(stream_exit_status) do
case stream_exit_status do
nil ->
{:ok, false}
stream_exit_status when is_boolean(stream_exit_status) ->
{:ok, stream_exit_status}
_ ->
{:error, ":stream_exit_status must be a boolean"}
end
end
defp normalize_stream_opts(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]),
{:ok, max_chunk_size} <- normalize_max_chunk_size(opts[:max_chunk_size]),
- {:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
+ {:ok, stderr} <- normalize_stderr(opts[:stderr]),
{:ok, ignore_epipe} <- normalize_ignore_epipe(opts[:ignore_epipe]),
{:ok, stream_exit_status} <- normalize_stream_exit_status(opts[:stream_exit_status]) do
{:ok,
%{
input: input,
exit_timeout: exit_timeout,
max_chunk_size: max_chunk_size,
- enable_stderr: enable_stderr,
+ stderr: stderr,
ignore_epipe: ignore_epipe,
stream_exit_status: stream_exit_status
}}
end
end
end
diff --git a/test/exile/process_test.exs b/test/exile/process_test.exs
index 1fc4ba1..9dde351 100644
--- a/test/exile/process_test.exs
+++ b/test/exile/process_test.exs
@@ -1,636 +1,637 @@
defmodule Exile.ProcessTest do
use ExUnit.Case, async: true
alias Exile.Process
alias Exile.Process.{Pipe, State}
doctest Exile.Process
describe "pipes" do
test "reading from stdout" do
{:ok, s} = Process.start_link(~w(echo test))
:timer.sleep(100)
assert {:ok, iodata} = Process.read(s, 100)
assert :eof = Process.read(s, 100)
assert IO.iodata_to_binary(iodata) == "test\n"
assert :ok == Process.close_stdin(s)
assert :ok == Process.close_stdout(s)
assert {:ok, 0} == Process.await_exit(s, 500)
refute Elixir.Process.alive?(s.pid)
end
test "write to stdin" do
{:ok, s} = Process.start_link(~w(cat))
assert :ok == Process.write(s, "hello")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "hello"
assert :ok == Process.write(s, "world")
assert {:ok, iodata} = Process.read(s, 5)
assert IO.iodata_to_binary(iodata) == "world"
assert :ok == Process.close_stdin(s)
assert :eof == Process.read(s)
assert {:ok, 0} == Process.await_exit(s, 100)
:timer.sleep(100)
refute Elixir.Process.alive?(s.pid)
end
test "when stdin is closed" do
logger = start_events_collector()
# base64 produces output only after getting EOF from stdin. we
# collect events in order and assert that we can still read from
# stdout even after closing stdin
{:ok, s} = Process.start_link(~w(base64))
# parallel reader should be blocked till we close stdin
start_parallel_reader(s, logger)
:timer.sleep(100)
assert :ok == Process.write(s, "hello")
add_event(logger, {:write, "hello"})
assert :ok == Process.write(s, "world")
add_event(logger, {:write, "world"})
:timer.sleep(100)
assert :ok == Process.close_stdin(s)
add_event(logger, :input_close)
assert {:ok, 0} == Process.await_exit(s)
# Process.stop(s)
# wait for the reader to read
Elixir.Process.sleep(500)
assert [
{:write, "hello"},
{:write, "world"},
:input_close,
{:read, "aGVsbG93b3JsZA==\n"},
:eof
] == get_events(logger)
end
test "reading from stderr" do
- {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: true)
+ {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], stderr: :consume)
assert {:ok, "foo\n"} = Process.read_stderr(s, 100)
end
test "reading from stdout or stderr using read_any" do
script = """
echo "foo"
echo "bar" >&2
"""
- {:ok, s} = Process.start_link(["sh", "-c", script], enable_stderr: true)
+ {:ok, s} = Process.start_link(["sh", "-c", script], stderr: :consume)
{:ok, ret1} = Process.read_any(s, 100)
{:ok, ret2} = Process.read_any(s, 100)
assert {:stderr, "bar\n"} in [ret1, ret2]
assert {:stdout, "foo\n"} in [ret1, ret2]
assert :eof = Process.read_any(s, 100)
end
test "reading from stderr_read when stderr disabled" do
- {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: false)
+ {:ok, s} = Process.start_link(["sh", "-c", "echo foo >>/dev/stderr"], stderr: :console)
+
assert {:error, :pipe_closed_or_invalid_caller} = Process.read_stderr(s, 100)
end
test "read_any with stderr disabled" do
script = """
echo "foo"
echo "bar" >&2
"""
- {:ok, s} = Process.start_link(["sh", "-c", script], enable_stderr: false)
+ {:ok, s} = Process.start_link(["sh", "-c", script], stderr: :console)
{:ok, ret} = Process.read_any(s, 100)
# we can still read from stdout even if stderr is disabled
assert ret == {:stdout, "foo\n"}
assert :eof = Process.read_any(s, 100)
end
test "if pipe gets closed on pipe owner exit normally" do
{:ok, s} = Process.start_link(~w(sleep 10000))
writer =
Task.async(fn ->
Process.change_pipe_owner(s, :stdin, self())
end)
# stdin must be closed on task completion
:ok = Task.await(writer)
assert %State{
pipes: %{
stdin: %Pipe{
name: :stdin,
fd: _,
monitor_ref: nil,
owner: nil,
status: :closed
},
# ensure other pipes are unaffected
stdout: %Pipe{
name: :stdout,
status: :open
}
}
} = :sys.get_state(s.pid)
end
test "if pipe gets closed on pipe owner is killed" do
{:ok, s} = Process.start_link(~w(sleep 10000))
writer =
spawn(fn ->
Process.change_pipe_owner(s, :stdin, self())
receive do
:block -> :ok
end
end)
# wait for pipe owner to change
:timer.sleep(100)
# stdin must be closed on process kill
true = Elixir.Process.exit(writer, :kill)
:timer.sleep(1000)
assert %State{
pipes: %{
stdin: %Pipe{
name: :stdin,
fd: _,
monitor_ref: nil,
owner: nil,
status: :closed
},
# ensure other pipes are unaffected
stdout: %Pipe{
name: :stdout,
status: :open
}
}
} = :sys.get_state(s.pid)
end
end
describe "process termination" do
test "if external program terminates on process exit" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
:ok = Process.close_stdin(s)
:timer.sleep(100)
refute os_process_alive?(os_pid)
end
test "watcher kills external command on process without exit_await" do
{os_pid, s} =
Task.async(fn ->
{:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
# ensure the script set the correct signal handlers (handlers to ignore signal)
assert {:ok, "ignored signals\n"} = Process.read(s)
# exit without waiting for the exile process
{os_pid, s}
end)
|> Task.await()
:timer.sleep(500)
# Exile Process should exit after Task process terminates
refute Elixir.Process.alive?(s.pid)
refute os_process_alive?(os_pid)
end
test "await_exit with timeout" do
{:ok, s} = Process.start_link([fixture("ignore_sigterm.sh")])
{:ok, os_pid} = Process.os_pid(s)
assert os_process_alive?(os_pid)
assert {:ok, "ignored signals\n"} = Process.read(s)
# attempt to kill the process after 100ms
assert {:ok, 137} = Process.await_exit(s, 100)
refute os_process_alive?(os_pid)
refute Elixir.Process.alive?(s.pid)
end
test "exit status" do
{:ok, s} = Process.start_link(["sh", "-c", "exit 10"])
assert {:ok, 10} == Process.await_exit(s)
end
test "writing binary larger than pipe buffer size" do
large_bin = generate_binary(5 * 65_535)
{:ok, s} = Process.start_link(~w(cat))
writer =
Task.async(fn ->
Process.change_pipe_owner(s, :stdin, self())
Process.write(s, large_bin)
end)
:timer.sleep(100)
iodata =
Stream.unfold(nil, fn _ ->
case Process.read(s) do
{:ok, data} -> {data, nil}
:eof -> nil
end
end)
|> Enum.to_list()
Task.await(writer)
assert IO.iodata_length(iodata) == 5 * 65_535
assert {:ok, 0} == Process.await_exit(s, 500)
end
test "if exile process is terminated on owner exit even if pipe owner is alive" do
parent = self()
owner =
spawn(fn ->
# owner process terminated without await_exit
{:ok, s} = Process.start_link(~w(cat))
snd(parent, {:ok, s})
:exit = recv(parent)
end)
{:ok, s} = recv(owner)
spawn_link(fn ->
Process.change_pipe_owner(s, :stdin, self())
block()
end)
spawn_link(fn ->
Process.change_pipe_owner(s, :stdout, self())
block()
end)
# wait for pipe owner to change
:timer.sleep(500)
snd(owner, :exit)
# wait for messages to propagate, if there are any
:timer.sleep(500)
refute Elixir.Process.alive?(owner)
refute Elixir.Process.alive?(s.pid)
end
test "if exile process is *NOT* terminated on owner exit, if any pipe owner is alive" do
parent = self()
{:ok, s} = Process.start_link(~w(cat))
io_proc =
spawn_link(fn ->
:ok = Process.change_pipe_owner(s, :stdin, self())
:ok = Process.change_pipe_owner(s, :stdout, self())
recv(parent)
end)
# wait for pipe owner to change
:timer.sleep(100)
# external process will be killed with SIGTERM (143)
assert {:ok, 143} = Process.await_exit(s, 100)
# wait for messages to propagate, if there are any
:timer.sleep(100)
assert Elixir.Process.alive?(s.pid)
assert %State{
pipes: %{
stdin: %Pipe{status: :open},
stdout: %Pipe{status: :open}
}
} = :sys.get_state(s.pid)
# when the io_proc exits, the pipes should be closed and process must terminate
snd(io_proc, :exit)
:timer.sleep(100)
refute Elixir.Process.alive?(s.pid)
end
test "when process is killed with a pending concurrent write" do
{:ok, s} = Process.start_link(~w(cat))
{:ok, os_pid} = Process.os_pid(s)
large_data =
Stream.cycle(["test"])
|> Stream.take(500_000)
|> Enum.to_list()
|> IO.iodata_to_binary()
task =
Task.async(fn ->
Process.change_pipe_owner(s, :stdin, self())
Process.write(s, large_data)
end)
# to avoid race conditions, like if process is killed before owner
# is changed
:timer.sleep(200)
assert {:ok, 1} == Process.await_exit(s)
refute os_process_alive?(os_pid)
assert {:error, :epipe} == Task.await(task)
end
test "if owner is killed when the exile process is killed" do
parent = self()
# create an exile process without linking to caller
owner =
spawn(fn ->
assert {:ok, s} = Process.start_link(~w(cat))
snd(parent, s.pid)
block()
end)
owner_ref = Elixir.Process.monitor(owner)
exile_pid = recv(owner)
exile_ref = Elixir.Process.monitor(exile_pid)
assert Elixir.Process.alive?(owner)
assert Elixir.Process.alive?(exile_pid)
true = Elixir.Process.exit(exile_pid, :kill)
assert_receive {:DOWN, ^owner_ref, :process, ^owner, :killed}
assert_receive {:DOWN, ^exile_ref, :process, ^exile_pid, :killed}
end
test "if exile process is killed when the owner is killed" do
parent = self()
# create an exile process without linking to caller
owner =
spawn(fn ->
assert {:ok, s} = Process.start_link(~w(cat))
snd(parent, s.pid)
block()
end)
owner_ref = Elixir.Process.monitor(owner)
exile_pid = recv(owner)
exile_ref = Elixir.Process.monitor(exile_pid)
assert Elixir.Process.alive?(owner)
assert Elixir.Process.alive?(exile_pid)
true = Elixir.Process.exit(owner, :kill)
assert_receive {:DOWN, ^owner_ref, :process, ^owner, :killed}
assert_receive {:DOWN, ^exile_ref, :process, ^exile_pid, :killed}
end
end
test "back-pressure" do
logger = start_events_collector()
# we test backpressure by testing if `write` is delayed when we delay read
{:ok, s} = Process.start_link(~w(cat))
large_bin = generate_binary(65_535 * 5)
writer =
Task.async(fn ->
Process.change_pipe_owner(s, :stdin, self())
:ok = Process.write(s, large_bin)
add_event(logger, {:write, IO.iodata_length(large_bin)})
end)
:timer.sleep(50)
reader =
Task.async(fn ->
Process.change_pipe_owner(s, :stdout, self())
Stream.unfold(nil, fn _ ->
case Process.read(s) do
{:ok, data} ->
add_event(logger, {:read, IO.iodata_length(data)})
# delay in reading should delay writes
:timer.sleep(10)
{nil, nil}
:eof ->
nil
end
end)
|> Stream.run()
end)
Task.await(writer)
Task.await(reader)
assert {:ok, 0} == Process.await_exit(s)
events = get_events(logger)
{write_events, read_evants} = Enum.split_with(events, &match?({:write, _}, &1))
assert Enum.sum(Enum.map(read_evants, fn {:read, size} -> size end)) ==
Enum.sum(Enum.map(write_events, fn {:write, size} -> size end))
# There must be a read before write completes
assert hd(events) == {:read, 65_535}
end
# this test does not work properly in linux
@tag :skip
test "if we are leaking file descriptor" do
{:ok, s} = Process.start_link(~w(sleep 60))
{:ok, os_pid} = Process.os_pid(s)
# we are only printing FD, TYPE, NAME with respective prefix
{bin, 0} = System.cmd("lsof", ["-F", "ftn", "-p", to_string(os_pid)])
open_files = parse_lsof(bin)
assert [
%{type: "PIPE", fd: "0", name: _},
%{type: "PIPE", fd: "1", name: _},
%{type: "CHR", fd: "2", name: "/dev/ttys007"}
] = open_files
end
describe "options and validation" do
test "cd option" do
parent = Path.expand("..", File.cwd!())
{:ok, s} = Process.start_link(~w(sh -c pwd), cd: parent)
{:ok, dir} = Process.read(s)
assert String.trim(dir) == parent
assert {:ok, 0} = Process.await_exit(s)
end
test "when cd is invalid" do
assert {:error, _} = Process.start_link(~w(sh -c pwd), cd: "invalid")
end
test "when user pass invalid option" do
assert {:error, "invalid opts: [invalid: :test]"} =
Process.start_link(~w(cat), invalid: :test)
end
test "env option" do
assert {:ok, s} = Process.start_link(~w(printenv TEST_ENV), env: %{"TEST_ENV" => "test"})
assert {:ok, "test\n"} = Process.read(s)
assert {:ok, 0} = Process.await_exit(s)
end
test "if external process inherits beam env" do
:ok = System.put_env([{"BEAM_ENV_A", "10"}])
assert {:ok, s} = Process.start_link(~w(printenv BEAM_ENV_A))
assert {:ok, "10\n"} = Process.read(s)
assert {:ok, 0} = Process.await_exit(s)
end
test "if user env overrides beam env" do
:ok = System.put_env([{"BEAM_ENV", "base"}])
assert {:ok, s} =
Process.start_link(~w(printenv BEAM_ENV), env: %{"BEAM_ENV" => "overridden"})
assert {:ok, "overridden\n"} = Process.read(s)
assert {:ok, 0} = Process.await_exit(s)
end
end
def start_parallel_reader(process, logger) do
spawn_link(fn ->
:ok = Process.change_pipe_owner(process, :stdout, self())
reader_loop(process, logger)
end)
end
def reader_loop(process, logger) do
case Process.read(process) do
{:ok, data} ->
add_event(logger, {:read, data})
reader_loop(process, logger)
:eof ->
add_event(logger, :eof)
end
end
def start_events_collector do
{:ok, ordered_events} = Agent.start(fn -> [] end)
ordered_events
end
def add_event(agent, event) do
:ok = Agent.update(agent, fn events -> events ++ [event] end)
end
def get_events(agent) do
Agent.get(agent, & &1)
end
defp os_process_alive?(pid) do
match?({_, 0}, System.cmd("ps", ["-p", to_string(pid)]))
end
defp fixture(script) do
Path.join([__DIR__, "../scripts", script])
end
defp parse_lsof(iodata) do
String.split(IO.iodata_to_binary(iodata), "\n", trim: true)
|> Enum.reduce([], fn
"f" <> fd, acc -> [%{fd: fd} | acc]
"t" <> type, [h | acc] -> [Map.put(h, :type, type) | acc]
"n" <> name, [h | acc] -> [Map.put(h, :name, name) | acc]
_, acc -> acc
end)
|> Enum.reverse()
|> Enum.reject(fn
%{fd: fd} when fd in ["255", "cwd", "txt"] ->
true
%{fd: "rtd", name: "/", type: "DIR"} ->
true
# filter libc and friends
%{fd: "mem", type: "REG", name: "/lib/x86_64-linux-gnu/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/C.UTF-8/" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/locale/locale-archive" <> _} ->
true
%{fd: "mem", type: "REG", name: "/usr/lib/x86_64-linux-gnu/gconv" <> _} ->
true
_ ->
false
end)
end
defp generate_binary(size) do
Stream.repeatedly(fn -> "A" end)
|> Enum.take(size)
|> IO.iodata_to_binary()
end
defp block do
rand = :rand.uniform()
receive do
^rand -> :ok
end
end
defp snd(pid, term) do
send(pid, {self(), term})
end
defp recv(sender) do
receive do
{^sender, term} -> term
after
1000 ->
raise "recv timeout"
end
end
end
diff --git a/test/exile_test.exs b/test/exile_test.exs
index 7069d05..c8dd4ae 100644
--- a/test/exile_test.exs
+++ b/test/exile_test.exs
@@ -1,123 +1,123 @@
defmodule ExileTest do
use ExUnit.Case
doctest Exile
test "stream with enumerable" do
proc_stream =
- Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end), enable_stderr: false)
+ Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end), stderr: :console)
stdout = proc_stream |> Enum.to_list()
assert IO.iodata_length(stdout) == 1000
end
test "stream with collectable" do
proc_stream =
Exile.stream!(["cat"], input: fn sink -> Enum.into(1..1000, sink, fn _ -> "a" end) end)
stdout = Enum.to_list(proc_stream)
assert IO.iodata_length(stdout) == 1000
end
test "stream without stdin" do
proc_stream = Exile.stream!(~w(echo hello))
stdout = Enum.to_list(proc_stream)
assert IO.iodata_to_binary(stdout) == "hello\n"
end
test "stderr" do
- proc_stream = Exile.stream!(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: true)
+ proc_stream = Exile.stream!(["sh", "-c", "echo foo >>/dev/stderr"], stderr: :consume)
assert {[], stderr} = split_stream(proc_stream)
assert IO.iodata_to_binary(stderr) == "foo\n"
end
test "multiple streams" do
script = """
for i in {1..1000}; do
echo "foo ${i}"
echo "bar ${i}" >&2
done
"""
- proc_stream = Exile.stream!(["sh", "-c", script], enable_stderr: true)
+ proc_stream = Exile.stream!(["sh", "-c", script], stderr: :consume)
{stdout, stderr} = split_stream(proc_stream)
stdout_lines = String.split(Enum.join(stdout), "\n", trim: true)
stderr_lines = String.split(Enum.join(stderr), "\n", trim: true)
assert length(stdout_lines) == length(stderr_lines)
assert Enum.all?(stdout_lines, &String.starts_with?(&1, "foo "))
assert Enum.all?(stderr_lines, &String.starts_with?(&1, "bar "))
end
test "environment variable" do
output =
Exile.stream!(~w(printenv FOO), env: %{"FOO" => "bar"})
|> Enum.to_list()
|> IO.iodata_to_binary()
assert output == "bar\n"
end
test "premature stream termination" do
input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
assert_raise Exile.Stream.AbnormalExit,
"program exited due to :epipe error",
fn ->
Exile.stream!(~w(cat), input: input_stream)
|> Enum.take(1)
end
end
test "premature stream termination when ignore_epipe is true" do
input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
assert ["hello"] ==
Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true, max_chunk_size: 5)
|> Enum.take(1)
end
test "stream!/2 with exit status" do
proc_stream = Exile.stream!(["sh", "-c", "exit 10"])
assert_raise Exile.Stream.AbnormalExit, "program exited with exit status: 10", fn ->
Enum.to_list(proc_stream)
end
end
test "stream/2 with exit status" do
proc_stream = Exile.stream(["sh", "-c", "exit 10"])
stdout = Enum.to_list(proc_stream)
assert stdout == [{:exit, {:status, 10}}]
end
test "stream!/2 abnormal exit status" do
proc_stream = Exile.stream!(["sh", "-c", "exit 5"])
exit_status =
try do
proc_stream
|> Enum.to_list()
nil
rescue
e in Exile.Stream.AbnormalExit ->
e.exit_status
end
assert exit_status == 5
end
defp split_stream(stream) do
{stdout, stderr} =
Enum.reduce(stream, {[], []}, fn
{:stdout, data}, {stdout, stderr} -> {[data | stdout], stderr}
{:stderr, data}, {stdout, stderr} -> {stdout, [data | stderr]}
end)
{Enum.reverse(stdout), Enum.reverse(stderr)}
end
end

File Metadata

Mime Type
text/x-diff
Expires
Tue, Nov 26, 12:41 AM (1 d, 7 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40143
Default Alt Text
(96 KB)

Event Timeline