Page MenuHomePhorge

No OneTemporary

Size
13 KB
Referenced Files
None
Subscribers
None
diff --git a/README.md b/README.md
index 7a20e32..42f8d5c 100644
--- a/README.md
+++ b/README.md
@@ -1,78 +1,78 @@
# Exile
Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix ports issues.
Exile is built around the idea of having demand-driven, asynchronous interaction with external process. Think of streaming a video through `ffmpeg` to serve a web request. Exile internally uses NIF. See [Rationale](#rationale) for details. It also provides stream abstraction for interacting with an external program. For example, getting audio out of a stream is as simple as
``` elixir
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
-`Exile.stream!` is a convenience wrapper around `Exile.Process`. Prefer using `Exile.stream!` over using `Exile.Process` directly.
+`Exile.stream!/2` is a convenience wrapper around `Exile.Process`. Prefer using `Exile.stream!` over using `Exile.Process` directly.
Exile requires OTP v22.1 and above.
**Exile is experimental and it is still work-in-progress. Exile is based on NIF, please know the implications of it before using it**
## Rationale
Existing approaches
#### Port
Port is the default way of executing external commands. This is okay when you have control over the external program's implementation and the interaction is minimal. Port has several important issues.
* it can end up creating [zombie process](https://hexdocs.pm/elixir/Port.html#module-zombie-operating-system-processes)
* cannot selectively close stdin. This is required when the external programs act on EOF from stdin
* it sends command output as a message to the beam process. This does not put back pressure on the external program and leads exhausting VM memory
#### Middleware based solutions
Libraries such as [Porcelain](https://github.com/alco/porcelain/), [Erlexec](https://github.com/saleyn/erlexec), [Rambo](https://github.com/jayjun/rambo), etc. solves the first two issues associated with ports - zombie process and selectively closing STDIN. But not the third issue - having back-pressure. At a high level, these libraries solve port issues by spawning an external middleware program which in turn spawns the program we want to run. Internally uses port for reading the output and writing input. Note that these libraries are solving a different subset of issues and have different functionality, please check the relevant project page for details.
* no back-pressure
* additional os process (middleware) for every execution of your program
* in few cases such as porcelain user has to install this external program explicitly
* might not be suitable when the program requires constant communication between beam process and external program
On the plus side, unlike Exile, bugs in the implementation does not bring down whole beam VM.
#### [ExCmd](https://github.com/akash-akya/ex_cmd)
This is my other stab at solving back pressure on the external program issue. It implements a demand-driven protocol using [odu](https://github.com/akash-akya/odu) to solve this. Since ExCmd is also a port based solution, concerns previously mentioned applies to ExCmd too.
## Exile
Internally Exile uses non-blocking asynchronous system calls to interact with the external process. It does not use port's message based communication instead does raw stdio using NIF. Uses asynchronous system calls for IO. Most of the system calls are non-blocking, so it should not block the beam schedulers. Makes use of dirty-schedulers for IO.
**Highlights**
* Back pressure
* no middleware program
* no additional os process. No performance/resource cost
* no need to install any external command
* tries to handle zombie process by attempting to clean up external process. *But* as there is no middleware involved with exile, so it is still possible to endup with zombie process if program misbehave.
* stream abstraction
* selectively consume stdout and stderr streams
If you are running executing huge number of external programs **concurrently** (more than few hundred) you might have to increase open file descriptors limit (`ulimit -n`)
Non-blocking io can be used for other interesting things. Such as reading named pipe (FIFO) files. `Exile.stream!(~w(cat data.pipe))` does not block schedulers, so you can open hundreds of fifo files unlike default `file` based io.
#### TODO
* add benchmarks results
### 🚨 Obligatory NIF warning
As with any NIF based solution, bugs or issues in Exile implementation **can bring down the beam VM**. But NIF implementation is comparatively small and mostly uses POSIX system calls. Also, spawned external processes are still completely isolated at OS level.
If all you want is to run a command with no communication, then just sticking with `System.cmd` is a better option.
### License
Copyright (c) 2020 Akash Hiremath.
Exile source code is released under Apache License 2.0. Check [LICENSE](LICENSE.md) for more information.
diff --git a/lib/exile.ex b/lib/exile.ex
index c0598e6..d90c974 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,93 +1,93 @@
defmodule Exile do
@moduledoc """
Exile is an alternative for beam ports with back-pressure and non-blocking IO
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
# we use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
@doc """
Runs the given command with arguments and return an Enumerable to read command output.
First parameter must be a list containing command with arguments. example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* Enumerable:
```
# List
Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
# Stream
Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65536)) |> Enum.to_list()
```
* Collectable:
If the input in a function with arity 1, Exile will call that function with a `Collectable` as the argument. The function must *push* input to this collectable. Return value of the function is ignored.
```
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
By defaults no input will be given to the command
* `exit_timeout` - Duration to wait for external program to exit after completion before raising an error. Defaults to `:infinity`
* `chunk_size` - Maximum size of each iodata chunk emitted by stream. Chunk size will be variable depending on the amount of data availble at that time. Defaults to 65535
* `use_stderr` - When set to true, stream will contain stderr output along with stdout output. Element of the stream will be of the form `{:stdout, iodata}` or `{:stderr, iodata}` to differentiate different streams. Defaults to false. See example below
- All other options are passed to `Exile.Process.start_link/3`
+ All other options are passed to `Exile.Process.start_link/2`
### Examples
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
Stream with stderr
```
script = \"""
for i in {1..10}; do
echo "foo ${i}"
echo "bar ${i}" >&2
done
\"""
Exile.stream!(["sh", "-c", script], use_stderr: true)
|> Enum.each(fn {stream, lines} ->
String.split(lines, "\\n", trim: true)
|> Enum.each(fn line -> IO.puts("\#{stream}: \#{line}") end)
end)
```
"""
@type collectable_func() :: (Collectable.t() -> any())
@spec stream!(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
max_chunk_size: pos_integer()
) :: ExCmd.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index 1864428..4042dba 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,203 +1,205 @@
defmodule Exile.Stream do
@moduledoc """
- Defines a `Exile.Stream` struct returned by `Exile.stream!/3`.
+ Defines a `Exile.Stream` struct returned by `Exile.stream!/2`.
"""
alias Exile.Process
alias Exile.Process.Error
defmodule Sink do
+ @moduledoc false
+
defstruct [:process]
defimpl Collectable do
def into(%{process: process} = stream) do
collector_fun = fn
:ok, {:cont, x} ->
:ok = Process.write(process, x)
:ok, :done ->
:ok = Process.close_stdin(process)
stream
:ok, :halt ->
:ok = Process.close_stdin(process)
end
{:ok, collector_fun}
end
end
end
defstruct [:process, :stream_opts]
@type t :: %__MODULE__{}
@doc false
def __build__(cmd_with_args, opts) do
{stream_opts, process_opts} =
Keyword.split(opts, [:exit_timeout, :max_chunk_size, :input, :use_stderr])
with {:ok, stream_opts} <- normalize_stream_opts(stream_opts) do
process_opts = Keyword.put(process_opts, :use_stderr, stream_opts[:use_stderr])
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
start_input_streamer(%Sink{process: process}, stream_opts.input)
%Exile.Stream{process: process, stream_opts: stream_opts}
else
{:error, error} -> raise ArgumentError, message: error
end
end
@doc false
defp start_input_streamer(sink, input) do
case input do
:no_input ->
:ok
{:enumerable, enum} ->
spawn_link(fn ->
Enum.into(enum, sink)
end)
{:collectable, func} ->
spawn_link(fn ->
func.(sink)
end)
end
end
defimpl Enumerable do
def reduce(arg, acc, fun) do
%{process: process, stream_opts: %{use_stderr: use_stderr} = stream_opts} = arg
start_fun = fn -> :normal end
next_fun = fn :normal ->
case Process.read_any(process, stream_opts.max_chunk_size) do
:eof ->
{:halt, :normal}
{:ok, {:stdout, x}} when use_stderr == false ->
{[IO.iodata_to_binary(x)], :normal}
{:ok, {stream, x}} when use_stderr == true ->
{[{stream, IO.iodata_to_binary(x)}], :normal}
{:error, errno} ->
raise Error, "Failed to read from the external process. errno: #{errno}"
end
end
after_fun = fn exit_type ->
try do
# always close stdin before stoping to give the command chance to exit properly
Process.close_stdin(process)
result = Process.await_exit(process, stream_opts.exit_timeout)
case {exit_type, result} do
{_, :timeout} ->
Process.kill(process, :sigkill)
raise Error, "command fail to exit within timeout: #{stream_opts[:exit_timeout]}"
{:normal, {:ok, {:exit, 0}}} ->
:ok
{:normal, {:ok, error}} ->
raise Error, "command exited with status: #{inspect(error)}"
{exit_type, error} ->
Process.kill(process, :sigkill)
raise Error, "command exited with exit_type: #{exit_type}, error: #{inspect(error)}"
end
after
Process.stop(process)
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
end
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}
!is_function(term) && Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}
is_function(term, 1) ->
{:ok, {:collectable, term}}
true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end
defp normalize_max_chunk_size(max_chunk_size) do
case max_chunk_size do
nil ->
{:ok, 65536}
max_chunk_size when is_integer(max_chunk_size) and max_chunk_size > 0 ->
{:ok, max_chunk_size}
_ ->
{:error, ":max_chunk_size must be a positive integer"}
end
end
defp normalize_exit_timeout(timeout) do
case timeout do
nil ->
{:ok, :infinity}
timeout when is_integer(timeout) and timeout > 0 ->
{:ok, timeout}
_ ->
{:error, ":exit_timeout must be either :infinity or an integer"}
end
end
defp normalize_use_stderr(use_stderr) do
case use_stderr do
nil ->
{:ok, false}
use_stderr when is_boolean(use_stderr) ->
{:ok, use_stderr}
_ ->
{:error, ":use_stderr must be a boolean"}
end
end
defp normalize_stream_opts(opts) when is_list(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]),
{:ok, max_chunk_size} <- normalize_max_chunk_size(opts[:max_chunk_size]),
{:ok, use_stderr} <- normalize_use_stderr(opts[:use_stderr]) do
{:ok,
%{
input: input,
exit_timeout: exit_timeout,
max_chunk_size: max_chunk_size,
use_stderr: use_stderr
}}
end
end
defp normalize_stream_opts(_), do: {:error, "stream_opts must be a keyword list"}
end

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 2:24 AM (1 d, 22 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40794
Default Alt Text
(13 KB)

Event Timeline