Page MenuHomePhorge

No OneTemporary

Size
22 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/exile.ex b/lib/exile.ex
index 38a97ef..30aa502 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,272 +1,277 @@
defmodule Exile do
@moduledoc ~S"""
Exile is an alternative for beam [ports](https://hexdocs.pm/elixir/Port.html)
with back-pressure and non-blocking IO.
### Quick Start
Run a command and read from stdout
```
iex> Exile.stream!(~w(echo Hello))
...> |> Enum.into("") # collect as string
"Hello\n"
```
Run a command with list of strings as input
```
iex> Exile.stream!(~w(cat), input: ["Hello", " ", "World"])
...> |> Enum.into("") # collect as string
"Hello World"
```
Run a command with input as Stream
```
iex> input_stream = Stream.map(1..10, fn num -> "#{num} " end)
iex> Exile.stream!(~w(cat), input: input_stream)
...> |> Enum.into("")
"1 2 3 4 5 6 7 8 9 10 "
```
Run a command with input as infinite stream
```
# create infinite stream
iex> input_stream = Stream.repeatedly(fn -> "A" end)
iex> binary =
...> Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true) # we need to ignore epipe since we are terminating the program before the input completes
...> |> Stream.take(2) # we must limit since the input stream is infinite
...> |> Enum.into("")
iex> is_binary(binary)
true
iex> "AAAAA" <> _ = binary
```
Run a command with input Collectable
```
# Exile calls the callback with a sink where the process can push the data
iex> Exile.stream!(~w(cat), input: fn sink ->
...> Stream.map(1..10, fn num -> "#{num} " end)
...> |> Stream.into(sink) # push to the external process
...> |> Stream.run()
...> end)
...> |> Stream.take(100) # we must limit since the input stream is infinite
...> |> Enum.into("")
"1 2 3 4 5 6 7 8 9 10 "
```
When the command wait for the input stream to close
```
# base64 command wait for the input to close and writes data to stdout at once
iex> Exile.stream!(~w(base64), input: ["abcdef"])
...> |> Enum.into("")
"YWJjZGVm\n"
```
- When the command exit with an error
+ `stream!/2` raises non-zero exit as error
```
- iex> Exile.stream!(["sh", "-c", "exit 4"])
- ...> |> Enum.into("")
- ** (Exile.Process.Error) command exited with status: 4
+ iex> Exile.stream!(["sh", "-c", "echo 'foo' && exit 10"])
+ ...> |> Enum.to_list()
+ ** (Exile.Stream.AbnormalExit) program exited with exit status: 10
+ ```
+
+ `stream/2` variant returns exit status as last element
+
+ ```
+ iex> Exile.stream(["sh", "-c", "echo 'foo' && exit 10"])
+ ...> |> Enum.to_list()
+ [
+ "foo\n",
+ {:exit, {:status, 10}} # returns exit status of the program as last element
+ ]
+ ```
+
+ You can fetch exit_status from the error for `stream!/2`
+
+ ```
+ iex> try do
+ ...> Exile.stream!(["sh", "-c", "exit 10"])
+ ...> |> Enum.to_list()
+ ...> rescue
+ ...> e in Exile.Stream.AbnormalExit ->
+ ...> e.exit_status
+ ...> end
+ 10
```
With `max_chunk_size` set
```
iex> data =
...> Exile.stream!(~w(cat /dev/urandom), max_chunk_size: 100, ignore_epipe: true)
...> |> Stream.take(5)
...> |> Enum.into("")
iex> byte_size(data)
500
```
When input and output run at different rate
```
iex> input_stream = Stream.map(1..1000, fn num -> "X #{num} X\n" end)
iex> Exile.stream!(~w(grep 250), input: input_stream)
...> |> Enum.into("")
"X 250 X\n"
```
With stderr enabled
```
iex> Exile.stream!(["sh", "-c", "echo foo\necho bar >> /dev/stderr"], enable_stderr: true)
...> |> Enum.to_list()
[{:stdout, "foo\n"}, {:stderr, "bar\n"}]
```
- `stream!/2` variant raises non-zero exit as error
-
- ```
- iex> Exile.stream!(["sh", "-c", "echo 'foo' && exit 10"])
- ...> |> Enum.to_list()
- ** (Exile.Process.Error) command exited with status: 10
- ```
-
- `stream/2` variant returns exit status as last element
-
- ```
- iex> Exile.stream(["sh", "-c", "echo 'foo' && exit 10"])
- ...> |> Enum.to_list()
- [
- "foo\n",
- {:exit, {:status, 10}} # returns exit status of the program as last element
- ]
- ```
-
For more details about stream API, see `Exile.stream!/2` and `Exile.stream/2`.
For more details about inner working, please check `Exile.Process`
documentation.
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
# We use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
@doc ~S"""
Runs the command with arguments and return an the stdout as lazily
Enumerable stream, similar to [`Stream`](https://hexdocs.pm/elixir/Stream.html).
First parameter must be a list containing command with arguments.
Example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* Enumerable:
```
# List
Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
# Stream
Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65_536)) |> Enum.to_list()
```
* Collectable:
If the input in a function with arity 1, Exile will call that function
with a `Collectable` as the argument. The function must *push* input to this
collectable. Return value of the function is ignored.
```
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
By defaults no input is sent to the command.
* `exit_timeout` - Duration to wait for external program to exit after completion
(when stream ends). Defaults to `:infinity`
* `max_chunk_size` - Maximum size of iodata chunk emitted by the stream.
Chunk size can be less than the `max_chunk_size` depending on the amount of
data available to be read. Defaults to `65_535`
* `enable_stderr` - When set to true, output stream will contain stderr data along
with stdout. Stream data will be of the form `{:stdout, iodata}` or `{:stderr, iodata}`
to differentiate different streams. Defaults to false. See example below
* `ignore_epipe` - When set to true, reader can exit early without raising error.
Typically writer gets `EPIPE` error on write when program terminate prematurely.
With `ignore_epipe` set to true this error will be ignored. This can be used to
match UNIX shell default behaviour. EPIPE is the error raised when the reader finishes
the reading and close output pipe before command completes. Defaults to `false`.
Remaining options are passed to `Exile.Process.start_link/2`
- If program exits with non-zero exit status then the `Exile.Process.Error` will be
- raised.
+ If program exits with non-zero exit status or :epipe then `Exile.Stream.AbnormalExit`
+ error will be raised with `exit_status` field set.
### Examples
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
Stream with stderr
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1),
input: File.stream!("music_video.mkv", [], 65_535),
enable_stderr: true
)
|> Stream.transform(
fn ->
File.open!("music.mp3", [:write, :binary])
end,
fn elem, file ->
case elem do
{:stdout, data} ->
# write stdout data to a file
:ok = IO.binwrite(file, data)
{:stderr, msg} ->
# write stderr output to console
:ok = IO.write(msg)
end
{[], file}
end,
fn file ->
:ok = File.close(file)
end
)
|> Stream.run()
```
"""
@type collectable_func() :: (Collectable.t() -> any())
@spec stream!(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
enable_stderr: boolean(),
ignore_epipe: boolean(),
max_chunk_size: pos_integer()
) :: Exile.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, Keyword.put(opts, :stream_exit_status, false))
end
@doc ~S"""
Same as `Exile.stream!/2` but the program exit status is passed as last
element of the stream.
The last element will be of the form `{:exit, term()}`. `term` will be a
positive integer in case of normal exit and `:epipe` in case of epipe error
See `Exile.stream!/2` documentation for details about the options and
examples.
"""
@spec stream(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
enable_stderr: boolean(),
ignore_epipe: boolean(),
max_chunk_size: pos_integer()
) :: Exile.Stream.t()
def stream(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, Keyword.put(opts, :stream_exit_status, true))
end
end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index a1ed86b..4ba734e 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,337 +1,352 @@
defmodule Exile.Stream do
@moduledoc """
Defines a `Exile.Stream` struct returned by `Exile.stream!/2`.
"""
alias Exile.Process
alias Exile.Process.Error
require Logger
+ defmodule AbnormalExit do
+ defexception [:message, :exit_status]
+
+ @impl true
+ def exception(:epipe) do
+ msg = "program exited due to :epipe error"
+ %__MODULE__{message: msg, exit_status: :epipe}
+ end
+
+ def exception(exit_status) do
+ msg = "program exited with exit status: #{exit_status}"
+ %__MODULE__{message: msg, exit_status: exit_status}
+ end
+ end
+
defmodule Sink do
@moduledoc false
@type t :: %__MODULE__{process: Process.t(), ignore_epipe: boolean}
defstruct [:process, :ignore_epipe]
defimpl Collectable do
def into(%{process: process}) do
collector_fun = fn
:ok, {:cont, x} ->
case Process.write(process, x) do
{:error, :epipe} ->
# there is no other way to stop a Collectable than to
# raise error, we catch this error and return `{:error, :epipe}`
raise Error, "epipe"
:ok ->
:ok
end
acc, :done ->
acc
acc, :halt ->
acc
end
{:ok, collector_fun}
end
end
end
defstruct [:stream_opts, :process_opts, :cmd_with_args]
@typedoc "Struct members are private, do not depend on them"
@type t :: %__MODULE__{
stream_opts: map(),
process_opts: keyword(),
cmd_with_args: [String.t()]
}
@stream_opts [
:exit_timeout,
:max_chunk_size,
:input,
:enable_stderr,
:ignore_epipe,
:stream_exit_status
]
@doc false
@spec __build__(nonempty_list(String.t()), keyword()) :: t()
def __build__(cmd_with_args, opts) do
{stream_opts, process_opts} = Keyword.split(opts, @stream_opts)
case normalize_stream_opts(stream_opts) do
{:ok, stream_opts} ->
%Exile.Stream{
stream_opts: stream_opts,
process_opts: process_opts,
cmd_with_args: cmd_with_args
}
{:error, error} ->
raise ArgumentError, message: error
end
end
defimpl Enumerable do
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def reduce(arg, acc, fun) do
start_fun = fn ->
state = start_process(arg)
{state, :running}
end
next_fun = fn
{state, :exited} ->
{:halt, {state, :exited}}
{state, exit_state} ->
%{
process: process,
stream_opts: %{
enable_stderr: enable_stderr,
stream_exit_status: stream_exit_status,
max_chunk_size: max_chunk_size
}
} = state
case Process.read_any(process, max_chunk_size) do
:eof when stream_exit_status == false ->
{:halt, {state, :eof}}
:eof when stream_exit_status == true ->
elem = [await_exit(state, :eof)]
{elem, {state, :exited}}
{:ok, {:stdout, x}} when enable_stderr == false ->
elem = [IO.iodata_to_binary(x)]
{elem, {state, exit_state}}
{:ok, {io_stream, x}} when enable_stderr == true ->
elem = [{io_stream, IO.iodata_to_binary(x)}]
{elem, {state, exit_state}}
{:error, errno} ->
raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
end
end
after_fun = fn
{_state, :exited} ->
:ok
{state, exit_state} ->
case await_exit(state, exit_state) do
{:exit, {:status, 0}} ->
:ok
{:exit, {:status, exit_status}} ->
- raise Error, "command exited with status: #{inspect(exit_status)}"
+ raise AbnormalExit, exit_status
{:exit, :epipe} ->
- raise Error, "abnormal command exit, received EPIPE while writing to stdin"
+ raise AbnormalExit, :epipe
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
defp start_process(%Exile.Stream{
process_opts: process_opts,
stream_opts: stream_opts,
cmd_with_args: cmd_with_args
}) do
process_opts = Keyword.put(process_opts, :enable_stderr, stream_opts[:enable_stderr])
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
sink = %Sink{process: process, ignore_epipe: stream_opts[:ignore_epipe]}
writer_task = start_input_streamer(sink, stream_opts.input)
%{process: process, stream_opts: stream_opts, writer_task: writer_task}
end
@doc false
@spec start_input_streamer(term, term) :: Task.t()
defp start_input_streamer(%Sink{process: process} = sink, input) do
case input do
:no_input ->
# use `Task.completed(:ok)` when bumping min Elixir requirement
Task.async(fn -> :ok end)
{:enumerable, enum} ->
Task.async(fn ->
Process.change_pipe_owner(process, :stdin, self())
try do
Enum.into(enum, sink)
rescue
Error ->
{:error, :epipe}
end
end)
{:collectable, func} ->
Task.async(fn ->
Process.change_pipe_owner(process, :stdin, self())
try do
func.(sink)
rescue
Error ->
{:error, :epipe}
end
end)
end
end
defp await_exit(state, exit_state) do
%{
process: process,
stream_opts: %{ignore_epipe: ignore_epipe, exit_timeout: exit_timeout},
writer_task: writer_task
} = state
result = Process.await_exit(process, exit_timeout)
writer_task_status = Task.await(writer_task)
case {exit_state, result, writer_task_status} do
# if reader exit early and there is a pending write
{:running, {:ok, _status}, {:error, :epipe}} when ignore_epipe ->
{:exit, {:status, 0}}
# if reader exit early and there is no pending write or if
# there is no writer
{:running, {:ok, _status}, :ok} when ignore_epipe ->
{:exit, {:status, 0}}
# if we get epipe from writer then raise that error, and ignore exit status
{:running, {:ok, _status}, {:error, :epipe}} when ignore_epipe == false ->
{:exit, :epipe}
# Normal exit success case
{_, {:ok, 0}, _} ->
{:exit, {:status, 0}}
{:eof, {:ok, exit_status}, _} ->
{:exit, {:status, exit_status}}
end
end
end
@spec normalize_input(term) ::
{:ok, :no_input} | {:ok, {:enumerable, term}} | {:ok, {:collectable, function}}
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}
!is_function(term, 1) && Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}
is_function(term, 1) ->
{:ok, {:collectable, term}}
true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end
defp normalize_max_chunk_size(max_chunk_size) do
case max_chunk_size do
nil ->
{:ok, 65_536}
max_chunk_size when is_integer(max_chunk_size) and max_chunk_size > 0 ->
{:ok, max_chunk_size}
_ ->
{:error, ":max_chunk_size must be a positive integer"}
end
end
defp normalize_exit_timeout(timeout) do
case timeout do
nil ->
{:ok, 5000}
timeout when is_integer(timeout) and timeout > 0 ->
{:ok, timeout}
_ ->
{:error, ":exit_timeout must be either :infinity or an integer"}
end
end
defp normalize_enable_stderr(enable_stderr) do
case enable_stderr do
nil ->
{:ok, false}
enable_stderr when is_boolean(enable_stderr) ->
{:ok, enable_stderr}
_ ->
{:error, ":enable_stderr must be a boolean"}
end
end
defp normalize_ignore_epipe(ignore_epipe) do
case ignore_epipe do
nil ->
{:ok, false}
ignore_epipe when is_boolean(ignore_epipe) ->
{:ok, ignore_epipe}
_ ->
{:error, ":ignore_epipe must be a boolean"}
end
end
defp normalize_stream_exit_status(stream_exit_status) do
case stream_exit_status do
nil ->
{:ok, false}
stream_exit_status when is_boolean(stream_exit_status) ->
{:ok, stream_exit_status}
_ ->
{:error, ":stream_exit_status must be a boolean"}
end
end
defp normalize_stream_opts(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]),
{:ok, max_chunk_size} <- normalize_max_chunk_size(opts[:max_chunk_size]),
{:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
{:ok, ignore_epipe} <- normalize_ignore_epipe(opts[:ignore_epipe]),
{:ok, stream_exit_status} <- normalize_stream_exit_status(opts[:stream_exit_status]) do
{:ok,
%{
input: input,
exit_timeout: exit_timeout,
max_chunk_size: max_chunk_size,
enable_stderr: enable_stderr,
ignore_epipe: ignore_epipe,
stream_exit_status: stream_exit_status
}}
end
end
end
diff --git a/test/exile_test.exs b/test/exile_test.exs
index be9709f..7069d05 100644
--- a/test/exile_test.exs
+++ b/test/exile_test.exs
@@ -1,106 +1,123 @@
defmodule ExileTest do
use ExUnit.Case
doctest Exile
test "stream with enumerable" do
proc_stream =
Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end), enable_stderr: false)
stdout = proc_stream |> Enum.to_list()
assert IO.iodata_length(stdout) == 1000
end
test "stream with collectable" do
proc_stream =
Exile.stream!(["cat"], input: fn sink -> Enum.into(1..1000, sink, fn _ -> "a" end) end)
stdout = Enum.to_list(proc_stream)
assert IO.iodata_length(stdout) == 1000
end
test "stream without stdin" do
proc_stream = Exile.stream!(~w(echo hello))
stdout = Enum.to_list(proc_stream)
assert IO.iodata_to_binary(stdout) == "hello\n"
end
test "stderr" do
proc_stream = Exile.stream!(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: true)
assert {[], stderr} = split_stream(proc_stream)
assert IO.iodata_to_binary(stderr) == "foo\n"
end
test "multiple streams" do
script = """
for i in {1..1000}; do
echo "foo ${i}"
echo "bar ${i}" >&2
done
"""
proc_stream = Exile.stream!(["sh", "-c", script], enable_stderr: true)
{stdout, stderr} = split_stream(proc_stream)
stdout_lines = String.split(Enum.join(stdout), "\n", trim: true)
stderr_lines = String.split(Enum.join(stderr), "\n", trim: true)
assert length(stdout_lines) == length(stderr_lines)
assert Enum.all?(stdout_lines, &String.starts_with?(&1, "foo "))
assert Enum.all?(stderr_lines, &String.starts_with?(&1, "bar "))
end
test "environment variable" do
output =
Exile.stream!(~w(printenv FOO), env: %{"FOO" => "bar"})
|> Enum.to_list()
|> IO.iodata_to_binary()
assert output == "bar\n"
end
test "premature stream termination" do
input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
- assert_raise Exile.Process.Error,
- "abnormal command exit, received EPIPE while writing to stdin",
+ assert_raise Exile.Stream.AbnormalExit,
+ "program exited due to :epipe error",
fn ->
Exile.stream!(~w(cat), input: input_stream)
|> Enum.take(1)
end
end
test "premature stream termination when ignore_epipe is true" do
input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
assert ["hello"] ==
Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true, max_chunk_size: 5)
|> Enum.take(1)
end
test "stream!/2 with exit status" do
proc_stream = Exile.stream!(["sh", "-c", "exit 10"])
- assert_raise Exile.Process.Error, "command exited with status: 10", fn ->
+ assert_raise Exile.Stream.AbnormalExit, "program exited with exit status: 10", fn ->
Enum.to_list(proc_stream)
end
end
test "stream/2 with exit status" do
proc_stream = Exile.stream(["sh", "-c", "exit 10"])
stdout = Enum.to_list(proc_stream)
assert stdout == [{:exit, {:status, 10}}]
end
+ test "stream!/2 abnormal exit status" do
+ proc_stream = Exile.stream!(["sh", "-c", "exit 5"])
+
+ exit_status =
+ try do
+ proc_stream
+ |> Enum.to_list()
+
+ nil
+ rescue
+ e in Exile.Stream.AbnormalExit ->
+ e.exit_status
+ end
+
+ assert exit_status == 5
+ end
+
defp split_stream(stream) do
{stdout, stderr} =
Enum.reduce(stream, {[], []}, fn
{:stdout, data}, {stdout, stderr} -> {[data | stdout], stderr}
{:stderr, data}, {stdout, stderr} -> {stdout, [data | stderr]}
end)
{Enum.reverse(stdout), Enum.reverse(stderr)}
end
end

File Metadata

Mime Type
text/x-diff
Expires
Tue, Nov 26, 8:27 AM (1 d, 13 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40249
Default Alt Text
(22 KB)

Event Timeline