Page MenuHomePhorge

No OneTemporary

Size
22 KB
Referenced Files
None
Subscribers
None
diff --git a/README.md b/README.md
index b9872ca..043ca3b 100644
--- a/README.md
+++ b/README.md
@@ -1,94 +1,202 @@
# Exile
[![CI](https://github.com/akash-akya/exile/actions/workflows/ci.yaml/badge.svg)](https://github.com/akash-akya/exile/actions/workflows/ci.yaml)
[![Hex.pm](https://img.shields.io/hexpm/v/exile.svg)](https://hex.pm/packages/exile)
[![docs](https://img.shields.io/badge/docs-hexpm-blue.svg)](https://hexdocs.pm/exile/)
Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix ports issues.
Exile is built around the idea of having demand-driven, asynchronous interaction with external process. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
``` elixir
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
See `Exile.stream!/2` module doc for more details about handling stderr and other options.
`Exile.stream!/2` is a convenience wrapper around `Exile.Process`. Prefer using `Exile.stream!` over using `Exile.Process` directly.
Exile requires OTP v22.1 and above.
Exile is based on NIF, please know consequence of that before using Exile. For basic use cases use [ExCmd](https://github.com/akash-akya/ex_cmd) instead.
+
## Installation
```elixir
def deps do
[
{:exile, "~> x.x.x"}
]
end
```
+
+## Quick Start
+
+ Run a command and read from stdout
+
+ ```
+ iex> Exile.stream!(~w(echo Hello))
+ ...> |> Enum.into("") # collect as string
+ "Hello\n"
+ ```
+
+ Run a command with list of strings as input
+
+ ```
+ iex> Exile.stream!(~w(cat), input: ["Hello", " ", "World"])
+ ...> |> Enum.into("") # collect as string
+ "Hello World"
+ ```
+
+ Run a command with input as Stream
+
+ ```
+ iex> input_stream = Stream.map(1..10, fn num -> "#{num} " end)
+ iex> Exile.stream!(~w(cat), input: input_stream)
+ ...> |> Enum.into("")
+ "1 2 3 4 5 6 7 8 9 10 "
+ ```
+
+ Run a command with input as infinite stream
+
+ ```
+ # create infinite stream
+ iex> input_stream = Stream.repeatedly(fn -> "A" end)
+ iex> binary =
+ ...> Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true) # we need to ignore epipe since we are terminating the program before the input completes
+ ...> |> Stream.take(2) # we must limit since the input stream is infinite
+ ...> |> Enum.into("")
+ iex> is_binary(binary)
+ true
+ iex> "AAAAA" <> _ = binary
+ ```
+
+ Run a command with input Collectable
+
+ ```
+ # Exile calls the callback with a sink where the process can push the data
+ iex> Exile.stream!(~w(cat), input: fn sink ->
+ ...> Stream.map(1..10, fn num -> "#{num} " end)
+ ...> |> Stream.into(sink) # push to the external process
+ ...> |> Stream.run()
+ ...> end)
+ ...> |> Stream.take(100) # we must limit since the input stream is infinite
+ ...> |> Enum.into("")
+ "1 2 3 4 5 6 7 8 9 10 "
+ ```
+
+ When the command wait for the input stream to close
+
+ ```
+ # base64 command wait for the input to close and writes data to stdout at once
+ iex> Exile.stream!(~w(base64), input: ["abcdef"])
+ ...> |> Enum.into("")
+ "YWJjZGVm\n"
+ ```
+
+ When the command exit with an error
+
+ ```
+ iex> Exile.stream!(["sh", "-c", "exit 4"])
+ ...> |> Enum.into("")
+ ** (Exile.Process.Error) command exited with status: 4
+ ```
+
+ With `max_chunk_size` set
+
+ ```
+ iex> data =
+ ...> Exile.stream!(~w(cat /dev/urandom), max_chunk_size: 100, ignore_epipe: true)
+ ...> |> Stream.take(5)
+ ...> |> Enum.into("")
+ iex> byte_size(data)
+ 500
+ ```
+
+ When input and output run at different rate
+
+ ```
+ iex> input_stream = Stream.map(1..1000, fn num -> "X #{num} X\n" end)
+ iex> Exile.stream!(~w(grep 250), input: input_stream)
+ ...> |> Enum.into("")
+ "X 250 X\n"
+ ```
+
+ With stderr enabled
+
+ ```
+ iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], enable_stderr: true)
+ ...> |> Enum.to_list()
+ [{:stdout, "foo\n"}, {:stderr, "bar\n"}]
+ ```
+
+ For more details about stream API, see `Exile.stream!/2`.
+
+ For more details about inner working, please check `Exile.Process`
+ documentation.
+
+
## Rationale
Existing approaches
#### Port
Port is the default way of executing external commands. This is okay when you have control over the external program's implementation and the interaction is minimal. Port has several important issues.
* it can end up creating [zombie process](https://hexdocs.pm/elixir/Port.html#module-zombie-operating-system-processes)
* cannot selectively close stdin. This is required when the external programs act on EOF from stdin
* it sends command output as a message to the beam process. This does not put back pressure on the external program and leads exhausting VM memory
#### Middleware based solutions
Libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. solves the first two issues associated with ports - zombie process and selectively closing STDIN. But not the third issue - having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
* no back-pressure
* additional os process (middleware) for every execution of your program
* in few cases such as porcelain user has to install this external program explicitly
* might not be suitable when the program requires constant communication between beam process and external program
On the plus side, unlike Exile, bugs in the implementation does not bring down whole beam VM.
#### [ExCmd](https://github.com/akash-akya/ex_cmd)
This is my other stab at solving back pressure on the external program issue. It implements a demand-driven protocol using [odu](https://github.com/akash-akya/odu) to solve this. Since ExCmd is also a port based solution, concerns previously mentioned applies to ExCmd too.
## Exile
Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication instead does raw stdio using NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Makes use of dirty-schedulers for IO.
**Highlights**
* Back pressure
* no middleware program
* no additional os process. No performance/resource cost
* no need to install any external command
* tries to handle zombie process by attempting to clean up external process. *But* as there is no middleware involved with exile, so it is still possible to endup with zombie process if program misbehave.
* stream abstraction
* selectively consume stdout and stderr streams
If you are running executing huge number of external programs **concurrently** (more than few hundred) you might have to increase open file descriptors limit (`ulimit -n`)
Non-blocking io can be used for other interesting things. Such as reading named pipe (FIFO) files. `Exile.stream!(~w(cat data.pipe))` does not block schedulers, so you can open hundreds of fifo files unlike default `file` based io.
#### TODO
* add benchmarks results
### 🚨 Obligatory NIF warning
As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls. Also, spawned external processes are still completely isolated at OS level.
If all you want is to run a command with no communication, then just sticking with `System.cmd` is a better.
### License
Copyright (c) 2020 Akash Hiremath.
Exile source code is released under Apache License 2.0. Check [LICENSE](LICENSE.md) for more information.
diff --git a/lib/exile.ex b/lib/exile.ex
index 7bd3545..b45ac32 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,139 +1,227 @@
defmodule Exile do
@moduledoc ~S"""
Exile is an alternative for beam [ports](https://hexdocs.pm/elixir/Port.html)
with back-pressure and non-blocking IO.
### Quick Start
- Read from stdout
+ Run a command and read from stdout
```
- iex> random_data =
+ iex> Exile.stream!(~w(echo Hello))
+ ...> |> Enum.into("") # collect as string
+ "Hello\n"
+ ```
+
+ Run a command with list of strings as input
+
+ ```
+ iex> Exile.stream!(~w(cat), input: ["Hello", " ", "World"])
+ ...> |> Enum.into("") # collect as string
+ "Hello World"
+ ```
+
+ Run a command with input as Stream
+
+ ```
+ iex> input_stream = Stream.map(1..10, fn num -> "#{num} " end)
+ iex> Exile.stream!(~w(cat), input: input_stream)
+ ...> |> Enum.into("")
+ "1 2 3 4 5 6 7 8 9 10 "
+ ```
+
+ Run a command with input as infinite stream
+
+ ```
+ # create infinite stream
+ iex> input_stream = Stream.repeatedly(fn -> "A" end)
+ iex> binary =
+ ...> Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true) # we need to ignore epipe since we are terminating the program before the input completes
+ ...> |> Stream.take(2) # we must limit since the input stream is infinite
+ ...> |> Enum.into("")
+ iex> is_binary(binary)
+ true
+ iex> "AAAAA" <> _ = binary
+ ```
+
+ Run a command with input Collectable
+
+ ```
+ # Exile calls the callback with a sink where the process can push the data
+ iex> Exile.stream!(~w(cat), input: fn sink ->
+ ...> Stream.map(1..10, fn num -> "#{num} " end)
+ ...> |> Stream.into(sink) # push to the external process
+ ...> |> Stream.run()
+ ...> end)
+ ...> |> Stream.take(100) # we must limit since the input stream is infinite
+ ...> |> Enum.into("")
+ "1 2 3 4 5 6 7 8 9 10 "
+ ```
+
+ When the command wait for the input stream to close
+
+ ```
+ # base64 command wait for the input to close and writes data to stdout at once
+ iex> Exile.stream!(~w(base64), input: ["abcdef"])
+ ...> |> Enum.into("")
+ "YWJjZGVm\n"
+ ```
+
+ When the command exit with an error
+
+ ```
+ iex> Exile.stream!(["sh", "-c", "exit 4"])
+ ...> |> Enum.into("")
+ ** (Exile.Process.Error) command exited with status: 4
+ ```
+
+ With `max_chunk_size` set
+
+ ```
+ iex> data =
...> Exile.stream!(~w(cat /dev/urandom), max_chunk_size: 100, ignore_epipe: true)
- ...> |> Enum.take(5)
- iex> byte_size(IO.iodata_to_binary(random_data))
+ ...> |> Stream.take(5)
+ ...> |> Enum.into("")
+ iex> byte_size(data)
500
```
- For more details about stream API, see `Exile.stream!`.
+ When input and output run at different rate
+
+ ```
+ iex> input_stream = Stream.map(1..1000, fn num -> "X #{num} X\n" end)
+ iex> Exile.stream!(~w(grep 250), input: input_stream)
+ ...> |> Enum.into("")
+ "X 250 X\n"
+ ```
+
+ With stderr enabled
+
+ ```
+ iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], enable_stderr: true)
+ ...> |> Enum.to_list()
+ [{:stdout, "foo\n"}, {:stderr, "bar\n"}]
+ ```
+
+ For more details about stream API, see `Exile.stream!/2`.
For more details about inner working, please check `Exile.Process`
documentation.
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
# We use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
@doc ~S"""
Runs the command with arguments and return an the stdout as lazily
Enumerable stream, similar to [`Stream`](https://hexdocs.pm/elixir/Stream.html).
First parameter must be a list containing command with arguments.
Example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* Enumerable:
```
# List
Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
# Stream
Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65_536)) |> Enum.to_list()
```
* Collectable:
If the input in a function with arity 1, Exile will call that function
with a `Collectable` as the argument. The function must *push* input to this
collectable. Return value of the function is ignored.
```
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
By defaults no input is sent to the command.
* `exit_timeout` - Duration to wait for external program to exit after completion
(when stream ends). Defaults to `:infinity`
* `max_chunk_size` - Maximum size of iodata chunk emitted by the stream.
Chunk size can be less than the `max_chunk_size` depending on the amount of
data available to be read. Defaults to `65_535`
* `enable_stderr` - When set to true, output stream will contain stderr data along
with stdout. Stream data will be of the form `{:stdout, iodata}` or `{:stderr, iodata}`
to differentiate different streams. Defaults to false. See example below
* `ignore_epipe` - When set to true, reader can exit early without raising error.
Typically writer gets `EPIPE` error on write when program terminate prematurely.
With `ignore_epipe` set to true this error will be ignored. This can be used to
match UNIX shell default behaviour. EPIPE is the error raised when the reader finishes
the reading and close output pipe before command completes. Defaults to `false`.
Remaining options are passed to `Exile.Process.start_link/2`
### Examples
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
Stream with stderr
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1),
input: File.stream!("music_video.mkv", [], 65_535),
enable_stderr: true
)
|> Stream.transform(
fn ->
File.open!("music.mp3", [:write, :binary])
end,
fn elem, file ->
case elem do
{:stdout, data} ->
# write stdout data to a file
:ok = IO.binwrite(file, data)
{:stderr, msg} ->
# write stderr output to console
:ok = IO.write(msg)
end
{[], file}
end,
fn file ->
:ok = File.close(file)
end
)
|> Stream.run()
```
"""
@type collectable_func() :: (Collectable.t() -> any())
@spec stream!(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
max_chunk_size: pos_integer()
) :: Exile.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index c58cb32..a4aa52f 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,266 +1,267 @@
defmodule Exile.Stream do
@moduledoc """
Defines a `Exile.Stream` struct returned by `Exile.stream!/2`.
"""
alias Exile.Process
alias Exile.Process.Error
require Logger
defmodule Sink do
@moduledoc false
@type t :: %__MODULE__{process: Process.t(), ignore_epipe: boolean}
defstruct [:process, :ignore_epipe]
defimpl Collectable do
def into(%{process: process}) do
collector_fun = fn
:ok, {:cont, x} ->
case Process.write(process, x) do
{:error, :epipe} ->
# there is no other way to stop a Collectable than to
# raise error, we catch this error and return `{:error, :epipe}`
raise Error, "epipe"
:ok ->
:ok
end
acc, :done ->
acc
acc, :halt ->
acc
end
{:ok, collector_fun}
end
end
end
defstruct [:process, :stream_opts, :writer_task]
@typedoc "Struct members are private, do not depend on them"
@type t :: %__MODULE__{process: Process.t(), stream_opts: map, writer_task: Task.t()}
@doc false
@spec __build__(nonempty_list(String.t()), keyword()) :: t()
def __build__(cmd_with_args, opts) do
{stream_opts, process_opts} =
Keyword.split(opts, [:exit_timeout, :max_chunk_size, :input, :enable_stderr, :ignore_epipe])
case normalize_stream_opts(stream_opts) do
{:ok, stream_opts} ->
process_opts = Keyword.put(process_opts, :enable_stderr, stream_opts[:enable_stderr])
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
writer_task =
start_input_streamer(
%Sink{process: process, ignore_epipe: stream_opts[:ignore_epipe]},
stream_opts.input
)
%Exile.Stream{process: process, stream_opts: stream_opts, writer_task: writer_task}
{:error, error} ->
raise ArgumentError, message: error
end
end
@doc false
@spec start_input_streamer(term, term) :: Task.t()
defp start_input_streamer(%Sink{process: process} = sink, input) do
case input do
:no_input ->
# use `Task.completed(:ok)` when bumping min Elixir requirement
Task.async(fn -> :ok end)
{:enumerable, enum} ->
Task.async(fn ->
Process.change_pipe_owner(process, :stdin, self())
try do
Enum.into(enum, sink)
rescue
Error ->
{:error, :epipe}
end
end)
{:collectable, func} ->
Task.async(fn ->
Process.change_pipe_owner(process, :stdin, self())
try do
func.(sink)
rescue
Error ->
{:error, :epipe}
end
end)
end
end
defimpl Enumerable do
+ # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def reduce(arg, acc, fun) do
%{
process: process,
stream_opts:
%{
enable_stderr: enable_stderr,
ignore_epipe: ignore_epipe
} = stream_opts,
writer_task: writer_task
} = arg
start_fun = fn -> :premature_exit end
next_fun = fn :premature_exit ->
case Process.read_any(process, stream_opts.max_chunk_size) do
:eof ->
{:halt, :normal_exit}
{:ok, {:stdout, x}} when enable_stderr == false ->
{[IO.iodata_to_binary(x)], :premature_exit}
{:ok, {stream, x}} when enable_stderr == true ->
{[{stream, IO.iodata_to_binary(x)}], :premature_exit}
{:error, errno} ->
raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
end
end
after_fun = fn exit_type ->
result = Process.await_exit(process, stream_opts.exit_timeout)
writer_task_status = Task.await(writer_task)
case {exit_type, result, writer_task_status} do
# if reader exit early and there is a pending write
{:premature_exit, {:ok, _status}, {:error, :epipe}} when ignore_epipe ->
:ok
# if reader exit early and there is no pending write or if
# there is no writer
{:premature_exit, {:ok, _status}, :ok} when ignore_epipe ->
:ok
# if we get epipe from writer then raise that error, and ignore exit status
{:premature_exit, {:ok, _status}, {:error, :epipe}} when ignore_epipe == false ->
raise Error, "abnormal command exit, received EPIPE while writing to stdin"
# Normal exit success case
{_, {:ok, 0}, _} ->
:ok
{:normal_exit, {:ok, error}, _} ->
raise Error, "command exited with status: #{inspect(error)}"
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
end
@spec normalize_input(term) ::
{:ok, :no_input} | {:ok, {:enumerable, term}} | {:ok, {:collectable, function}}
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}
!is_function(term, 1) && Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}
is_function(term, 1) ->
{:ok, {:collectable, term}}
true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end
defp normalize_max_chunk_size(max_chunk_size) do
case max_chunk_size do
nil ->
{:ok, 65_536}
max_chunk_size when is_integer(max_chunk_size) and max_chunk_size > 0 ->
{:ok, max_chunk_size}
_ ->
{:error, ":max_chunk_size must be a positive integer"}
end
end
defp normalize_exit_timeout(timeout) do
case timeout do
nil ->
{:ok, 5000}
timeout when is_integer(timeout) and timeout > 0 ->
{:ok, timeout}
_ ->
{:error, ":exit_timeout must be either :infinity or an integer"}
end
end
defp normalize_enable_stderr(enable_stderr) do
case enable_stderr do
nil ->
{:ok, false}
enable_stderr when is_boolean(enable_stderr) ->
{:ok, enable_stderr}
_ ->
{:error, ":enable_stderr must be a boolean"}
end
end
defp normalize_ignore_epipe(ignore_epipe) do
case ignore_epipe do
nil ->
{:ok, false}
ignore_epipe when is_boolean(ignore_epipe) ->
{:ok, ignore_epipe}
_ ->
{:error, ":ignore_epipe must be a boolean"}
end
end
defp normalize_stream_opts(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]),
{:ok, max_chunk_size} <- normalize_max_chunk_size(opts[:max_chunk_size]),
{:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
{:ok, ignore_epipe} <- normalize_ignore_epipe(opts[:ignore_epipe]) do
{:ok,
%{
input: input,
exit_timeout: exit_timeout,
max_chunk_size: max_chunk_size,
enable_stderr: enable_stderr,
ignore_epipe: ignore_epipe
}}
end
end
end

File Metadata

Mime Type
text/x-diff
Expires
Wed, Nov 27, 4:25 AM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40549
Default Alt Text
(22 KB)

Event Timeline