Page MenuHomePhorge

No OneTemporary

Size
15 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/exile.ex b/lib/exile.ex
index 98310f7..7bd3545 100644
--- a/lib/exile.ex
+++ b/lib/exile.ex
@@ -1,132 +1,139 @@
defmodule Exile do
@moduledoc ~S"""
Exile is an alternative for beam [ports](https://hexdocs.pm/elixir/Port.html)
with back-pressure and non-blocking IO.
### Quick Start
- ```
+ Read from stdout
+ ```
+ iex> random_data =
+ ...> Exile.stream!(~w(cat /dev/urandom), max_chunk_size: 100, ignore_epipe: true)
+ ...> |> Enum.take(5)
+ iex> byte_size(IO.iodata_to_binary(random_data))
+ 500
```
For more details about stream API, see `Exile.stream!`.
For more details about inner working, please check `Exile.Process`
documentation.
"""
use Application
@doc false
def start(_type, _args) do
opts = [
name: Exile.WatcherSupervisor,
strategy: :one_for_one
]
# We use DynamicSupervisor for cleaning up external processes on
# :init.stop or SIGTERM
DynamicSupervisor.start_link(opts)
end
@doc ~S"""
Runs the command with arguments and return an the stdout as lazily
Enumerable stream, similar to [`Stream`](https://hexdocs.pm/elixir/Stream.html).
First parameter must be a list containing command with arguments.
Example: `["cat", "file.txt"]`.
### Options
* `input` - Input can be either an `Enumerable` or a function which accepts `Collectable`.
* Enumerable:
```
# List
Exile.stream!(~w(base64), input: ["hello", "world"]) |> Enum.to_list()
# Stream
Exile.stream!(~w(cat), input: File.stream!("log.txt", [], 65_536)) |> Enum.to_list()
```
* Collectable:
If the input in a function with arity 1, Exile will call that function
with a `Collectable` as the argument. The function must *push* input to this
collectable. Return value of the function is ignored.
```
Exile.stream!(~w(cat), input: fn sink -> Enum.into(1..100, sink, &to_string/1) end)
|> Enum.to_list()
```
By defaults no input is sent to the command.
* `exit_timeout` - Duration to wait for external program to exit after completion
(when stream ends). Defaults to `:infinity`
* `max_chunk_size` - Maximum size of iodata chunk emitted by the stream.
Chunk size can be less than the `max_chunk_size` depending on the amount of
data available to be read. Defaults to `65_535`
* `enable_stderr` - When set to true, output stream will contain stderr data along
with stdout. Stream data will be of the form `{:stdout, iodata}` or `{:stderr, iodata}`
to differentiate different streams. Defaults to false. See example below
- * `ignore_epipe` - when set to true, `EPIPE` error during the write will be ignored.
- This can be used to match UNIX shell default behaviour. EPIPE is the error raised
- when the reader finishes the reading and close output pipe before command completes.
- Defaults to `false`.
+ * `ignore_epipe` - When set to true, reader can exit early without raising error.
+ Typically writer gets `EPIPE` error on write when program terminate prematurely.
+ With `ignore_epipe` set to true this error will be ignored. This can be used to
+ match UNIX shell default behaviour. EPIPE is the error raised when the reader finishes
+ the reading and close output pipe before command completes. Defaults to `false`.
Remaining options are passed to `Exile.Process.start_link/2`
### Examples
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()
```
Stream with stderr
```
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1),
input: File.stream!("music_video.mkv", [], 65_535),
enable_stderr: true
)
|> Stream.transform(
fn ->
File.open!("music.mp3", [:write, :binary])
end,
fn elem, file ->
case elem do
{:stdout, data} ->
# write stdout data to a file
:ok = IO.binwrite(file, data)
{:stderr, msg} ->
# write stderr output to console
:ok = IO.write(msg)
end
{[], file}
end,
fn file ->
:ok = File.close(file)
end
)
|> Stream.run()
```
"""
@type collectable_func() :: (Collectable.t() -> any())
@spec stream!(nonempty_list(String.t()),
input: Enum.t() | collectable_func(),
exit_timeout: timeout(),
max_chunk_size: pos_integer()
) :: Exile.Stream.t()
def stream!(cmd_with_args, opts \\ []) do
Exile.Stream.__build__(cmd_with_args, opts)
end
end
diff --git a/lib/exile/stream.ex b/lib/exile/stream.ex
index 57ef065..c58cb32 100644
--- a/lib/exile/stream.ex
+++ b/lib/exile/stream.ex
@@ -1,258 +1,266 @@
defmodule Exile.Stream do
@moduledoc """
Defines a `Exile.Stream` struct returned by `Exile.stream!/2`.
"""
alias Exile.Process
alias Exile.Process.Error
require Logger
defmodule Sink do
@moduledoc false
@type t :: %__MODULE__{process: Process.t(), ignore_epipe: boolean}
defstruct [:process, :ignore_epipe]
defimpl Collectable do
def into(%{process: process}) do
collector_fun = fn
:ok, {:cont, x} ->
case Process.write(process, x) do
{:error, :epipe} ->
# there is no other way to stop a Collectable than to
# raise error, we catch this error and return `{:error, :epipe}`
raise Error, "epipe"
:ok ->
:ok
end
acc, :done ->
acc
acc, :halt ->
acc
end
{:ok, collector_fun}
end
end
end
defstruct [:process, :stream_opts, :writer_task]
@typedoc "Struct members are private, do not depend on them"
@type t :: %__MODULE__{process: Process.t(), stream_opts: map, writer_task: Task.t()}
@doc false
@spec __build__(nonempty_list(String.t()), keyword()) :: t()
def __build__(cmd_with_args, opts) do
{stream_opts, process_opts} =
Keyword.split(opts, [:exit_timeout, :max_chunk_size, :input, :enable_stderr, :ignore_epipe])
case normalize_stream_opts(stream_opts) do
{:ok, stream_opts} ->
process_opts = Keyword.put(process_opts, :enable_stderr, stream_opts[:enable_stderr])
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
writer_task =
start_input_streamer(
%Sink{process: process, ignore_epipe: stream_opts[:ignore_epipe]},
stream_opts.input
)
%Exile.Stream{process: process, stream_opts: stream_opts, writer_task: writer_task}
{:error, error} ->
raise ArgumentError, message: error
end
end
@doc false
@spec start_input_streamer(term, term) :: Task.t()
defp start_input_streamer(%Sink{process: process} = sink, input) do
case input do
:no_input ->
# use `Task.completed(:ok)` when bumping min Elixir requirement
Task.async(fn -> :ok end)
{:enumerable, enum} ->
Task.async(fn ->
Process.change_pipe_owner(process, :stdin, self())
try do
Enum.into(enum, sink)
rescue
Error ->
{:error, :epipe}
end
end)
{:collectable, func} ->
Task.async(fn ->
Process.change_pipe_owner(process, :stdin, self())
try do
func.(sink)
rescue
Error ->
{:error, :epipe}
end
end)
end
end
defimpl Enumerable do
def reduce(arg, acc, fun) do
%{
process: process,
stream_opts:
%{
enable_stderr: enable_stderr,
ignore_epipe: ignore_epipe
} = stream_opts,
writer_task: writer_task
} = arg
- start_fun = fn -> :normal end
+ start_fun = fn -> :premature_exit end
- next_fun = fn :normal ->
+ next_fun = fn :premature_exit ->
case Process.read_any(process, stream_opts.max_chunk_size) do
:eof ->
- {:halt, :normal}
+ {:halt, :normal_exit}
{:ok, {:stdout, x}} when enable_stderr == false ->
- {[IO.iodata_to_binary(x)], :normal}
+ {[IO.iodata_to_binary(x)], :premature_exit}
{:ok, {stream, x}} when enable_stderr == true ->
- {[{stream, IO.iodata_to_binary(x)}], :normal}
+ {[{stream, IO.iodata_to_binary(x)}], :premature_exit}
{:error, errno} ->
- raise Error, "Failed to read from the external process. errno: #{errno}"
+ raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
end
end
after_fun = fn exit_type ->
result = Process.await_exit(process, stream_opts.exit_timeout)
writer_task_status = Task.await(writer_task)
case {exit_type, result, writer_task_status} do
- {:normal, {:ok, 0}, :ok} ->
+ # if reader exit early and there is a pending write
+ {:premature_exit, {:ok, _status}, {:error, :epipe}} when ignore_epipe ->
:ok
- {:normal, {:ok, _error}, {:error, :epipe}} when ignore_epipe ->
+ # if reader exit early and there is no pending write or if
+ # there is no writer
+ {:premature_exit, {:ok, _status}, :ok} when ignore_epipe ->
:ok
- {:normal, {:ok, _error}, {:error, :epipe}} ->
+ # if we get epipe from writer then raise that error, and ignore exit status
+ {:premature_exit, {:ok, _status}, {:error, :epipe}} when ignore_epipe == false ->
raise Error, "abnormal command exit, received EPIPE while writing to stdin"
- {:normal, {:ok, error}, _} ->
+ # Normal exit success case
+ {_, {:ok, 0}, _} ->
+ :ok
+
+ {:normal_exit, {:ok, error}, _} ->
raise Error, "command exited with status: #{inspect(error)}"
end
end
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
end
def count(_stream) do
{:error, __MODULE__}
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
end
@spec normalize_input(term) ::
{:ok, :no_input} | {:ok, {:enumerable, term}} | {:ok, {:collectable, function}}
defp normalize_input(term) do
cond do
is_nil(term) ->
{:ok, :no_input}
- !is_function(term) && Enumerable.impl_for(term) ->
+ !is_function(term, 1) && Enumerable.impl_for(term) ->
{:ok, {:enumerable, term}}
is_function(term, 1) ->
{:ok, {:collectable, term}}
true ->
{:error, "`:input` must be either Enumerable or a function which accepts collectable"}
end
end
defp normalize_max_chunk_size(max_chunk_size) do
case max_chunk_size do
nil ->
{:ok, 65_536}
max_chunk_size when is_integer(max_chunk_size) and max_chunk_size > 0 ->
{:ok, max_chunk_size}
_ ->
{:error, ":max_chunk_size must be a positive integer"}
end
end
defp normalize_exit_timeout(timeout) do
case timeout do
nil ->
{:ok, 5000}
timeout when is_integer(timeout) and timeout > 0 ->
{:ok, timeout}
_ ->
{:error, ":exit_timeout must be either :infinity or an integer"}
end
end
defp normalize_enable_stderr(enable_stderr) do
case enable_stderr do
nil ->
{:ok, false}
enable_stderr when is_boolean(enable_stderr) ->
{:ok, enable_stderr}
_ ->
{:error, ":enable_stderr must be a boolean"}
end
end
defp normalize_ignore_epipe(ignore_epipe) do
case ignore_epipe do
nil ->
{:ok, false}
ignore_epipe when is_boolean(ignore_epipe) ->
{:ok, ignore_epipe}
_ ->
{:error, ":ignore_epipe must be a boolean"}
end
end
defp normalize_stream_opts(opts) do
with {:ok, input} <- normalize_input(opts[:input]),
{:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]),
{:ok, max_chunk_size} <- normalize_max_chunk_size(opts[:max_chunk_size]),
{:ok, enable_stderr} <- normalize_enable_stderr(opts[:enable_stderr]),
{:ok, ignore_epipe} <- normalize_ignore_epipe(opts[:ignore_epipe]) do
{:ok,
%{
input: input,
exit_timeout: exit_timeout,
max_chunk_size: max_chunk_size,
enable_stderr: enable_stderr,
ignore_epipe: ignore_epipe
}}
end
end
end
diff --git a/test/exile_test.exs b/test/exile_test.exs
index fe7822a..6c7a207 100644
--- a/test/exile_test.exs
+++ b/test/exile_test.exs
@@ -1,90 +1,92 @@
defmodule ExileTest do
use ExUnit.Case
+ doctest Exile
+
test "stream with enumerable" do
proc_stream =
Exile.stream!(["cat"], input: Stream.map(1..1000, fn _ -> "a" end), enable_stderr: false)
stdout = proc_stream |> Enum.to_list()
assert IO.iodata_length(stdout) == 1000
end
test "stream with collectable" do
proc_stream =
Exile.stream!(["cat"], input: fn sink -> Enum.into(1..1000, sink, fn _ -> "a" end) end)
stdout = Enum.to_list(proc_stream)
assert IO.iodata_length(stdout) == 1000
end
test "stream without stdin" do
proc_stream = Exile.stream!(~w(echo hello))
stdout = Enum.to_list(proc_stream)
assert IO.iodata_to_binary(stdout) == "hello\n"
end
test "stderr" do
proc_stream = Exile.stream!(["sh", "-c", "echo foo >>/dev/stderr"], enable_stderr: true)
assert {[], stderr} = split_stream(proc_stream)
assert IO.iodata_to_binary(stderr) == "foo\n"
end
test "multiple streams" do
script = """
for i in {1..1000}; do
echo "foo ${i}"
echo "bar ${i}" >&2
done
"""
proc_stream = Exile.stream!(["sh", "-c", script], enable_stderr: true)
{stdout, stderr} = split_stream(proc_stream)
stdout_lines = String.split(Enum.join(stdout), "\n", trim: true)
stderr_lines = String.split(Enum.join(stderr), "\n", trim: true)
assert length(stdout_lines) == length(stderr_lines)
assert Enum.all?(stdout_lines, &String.starts_with?(&1, "foo "))
assert Enum.all?(stderr_lines, &String.starts_with?(&1, "bar "))
end
test "environment variable" do
output =
Exile.stream!(~w(printenv FOO), env: %{"FOO" => "bar"})
|> Enum.to_list()
|> IO.iodata_to_binary()
assert output == "bar\n"
end
test "premature stream termination" do
input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
assert_raise Exile.Process.Error,
"abnormal command exit, received EPIPE while writing to stdin",
fn ->
Exile.stream!(~w(cat), input: input_stream)
|> Enum.take(1)
end
end
test "premature stream termination when ignore_epipe is true" do
input_stream = Stream.map(1..100_000, fn _ -> "hello" end)
assert ["hello"] ==
Exile.stream!(~w(cat), input: input_stream, ignore_epipe: true, max_chunk_size: 5)
|> Enum.take(1)
end
defp split_stream(stream) do
{stdout, stderr} =
Enum.reduce(stream, {[], []}, fn
{:stdout, data}, {stdout, stderr} -> {[data | stdout], stderr}
{:stderr, data}, {stdout, stderr} -> {stdout, [data | stderr]}
end)
{Enum.reverse(stdout), Enum.reverse(stderr)}
end
end

File Metadata

Mime Type
text/x-diff
Expires
Wed, Nov 27, 6:52 AM (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40572
Default Alt Text
(15 KB)

Event Timeline