Page MenuHomePhorge

No OneTemporary

Size
25 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/tesla/adapter/mint.ex b/lib/tesla/adapter/mint.ex
index 44ce4fb..6909585 100644
--- a/lib/tesla/adapter/mint.ex
+++ b/lib/tesla/adapter/mint.ex
@@ -1,192 +1,329 @@
if Code.ensure_loaded?(Mint.HTTP) do
defmodule Tesla.Adapter.Mint do
@moduledoc """
- Adapter for [mint](https://github.com/ericmj/mint)
-
- Caution: The minimum supported Elixir version for mint is 1.5.0
-
- Remember to add `{:mint, "~> 0.2.0"}` and `{:castore, "~> 0.1.0"}` to dependencies
- Also, you need to recompile tesla after adding `:mint` dependency:
-
- ```
- mix deps.clean tesla
- mix deps.compile tesla
- ```
-
- ### Example usage
- ```
- # set globally in config/config.exs
- config :tesla, :adapter, Tesla.Adapter.Mint
-
- # set per module
- defmodule MyClient do
- use Tesla
-
- adapter Tesla.Adapter.Mint
- end
-
- # set global custom cacert
- config :tesla, Tesla.Adapter.Mint, cacert: ["path_to_cacert"]
+ Adapter for [mint](https://github.com/ericmj/mint)
+ Caution: The minimum supported Elixir version for mint is 1.5.0
+ Remember to add `{:mint, "~> 1.0"}` and `{:castore, "~> 0.1"}` to dependencies
+ Also, you need to recompile tesla after adding `:mint` dependency:
+ ```
+ mix deps.clean tesla
+ mix deps.compile tesla
+ ```
+ ### Example usage
+ ```
+ # set globally in config/config.exs
+ config :tesla, :adapter, Tesla.Adapter.Mint
+ # set per module
+ defmodule MyClient do
+ use Tesla
+ adapter Tesla.Adapter.Mint
+ end
+ ```
+ ### Adapter specific options:
+ * `timeout` - Time, while process, will wait for mint messages.
+ * `body_as` - What will be returned in `%Tesla.Env{}` body key. Possible values - `:plain`, `:stream`, `:chunks`. Defaults to `:plain`.
+ * `:plain` - as binary.
+ * `:stream` - as stream. If you don't want to close connection (because you want to reuse it later) pass `close_conn: false` in adapter opts.
+ * `:chunks` - as chunks. You can get response body in chunks using `Tesla.Adapter.Mint.read_chunk/3` function.
+ Processing of the chunks and checking body size must be done by yourself. Example of processing function is in `test/tesla/adapter/mint_test.exs` - `Tesla.Adapter.MintTest.read_body/4`. If you don't need connection later don't forget to close it with `Tesla.Adapter.Mint.close/1`.
+ * `max_body` - Max response body size in bytes. Works only with `body_as: :plain`, with other settings you need to check response body size by yourself.
+ * `conn` - Opened connection with mint. Is used for reusing mint connections.
+ * `original` - Original host with port, for which reused connection was open. Needed for `Tesla.Middleware.FollowRedirects`. Otherwise adapter will use connection for another open host.
+ * `close_conn` - Close connection or not after receiving full response body. Is used for reusing mint connections. Defaults to `true`.
+ * `proxy` - Proxy settings. E.g.: `{:http, "localhost", 8888, []}`, `{:http, "127.0.0.1", 8888, []}`
"""
@behaviour Tesla.Adapter
- import Tesla.Adapter.Shared, only: [stream_to_fun: 1, next_chunk: 1]
+ import Tesla.Adapter.Shared
alias Tesla.Multipart
alias Mint.HTTP
- @default adapter: [timeout: 2_000]
+ @default timeout: 2_000, body_as: :plain, close_conn: true, mode: :active
- @doc false
+ @impl true
def call(env, opts) do
opts = Tesla.Adapter.opts(@default, env, opts)
with {:ok, status, headers, body} <- request(env, opts) do
{:ok, %{env | status: status, headers: headers, body: body}}
end
end
- defp request(env, opts) do
- # Break the URI
- %URI{host: host, scheme: scheme, port: port, path: path, query: query} = URI.parse(env.url)
- query = (query || "") |> URI.decode_query() |> Map.to_list()
- path = Tesla.build_url(path, env.query ++ query)
-
- method = env.method |> Atom.to_string() |> String.upcase()
+ @doc """
+ Reads chunk of the response body.
+ Returns `{:fin, HTTP.t(), binary()}` if all body received, otherwise returns `{:nofin, HTTP.t(), binary()}`.
+ """
- # Set the global cacert file
- opts =
- if scheme == "https" && !is_nil(get_global_default_ca()) do
- transport_opts = Access.get(opts, :transport_opts, [])
+ @spec read_chunk(HTTP.t(), reference(), keyword()) ::
+ {:fin, HTTP.t(), binary()} | {:nofin, HTTP.t(), binary()}
+ def read_chunk(conn, ref, opts) do
+ with {:ok, conn, acc} <- receive_packet(conn, ref, opts),
+ {state, data} <- response_state(acc) do
+ {:ok, conn} =
+ if state == :fin and opts[:close_conn] do
+ close(conn)
+ else
+ {:ok, conn}
+ end
- transport_opts =
- Keyword.put(
- transport_opts,
- :cacertfile,
- Keyword.get(transport_opts, :cacertfile, []) ++ get_global_default_ca()
- )
+ {state, conn, data}
+ end
+ end
- Keyword.put(opts, :transport_opts, transport_opts)
- else
- opts
- end
+ @doc """
+ Closes mint connection.
+ """
+ @spec close(HTTP.t()) :: {:ok, HTTP.t()}
+ defdelegate close(conn), to: HTTP
+ defp request(env, opts) do
request(
- method,
- scheme,
- host,
- port,
- path,
+ format_method(env.method),
+ Tesla.build_url(env.url, env.query),
env.headers,
env.body,
- opts
+ Enum.into(opts, %{})
)
end
- defp request(method, scheme, host, port, path, headers, %Stream{} = body, opts) do
+ defp request(method, url, headers, %Stream{} = body, opts) do
fun = stream_to_fun(body)
- request(method, scheme, host, port, path, headers, fun, opts)
+ request(method, url, headers, fun, opts)
end
- defp request(method, scheme, host, port, path, headers, %Multipart{} = body, opts) do
+ defp request(method, url, headers, %Multipart{} = body, opts) do
headers = headers ++ Multipart.headers(body)
fun = stream_to_fun(Multipart.body(body))
- request(method, scheme, host, port, path, headers, fun, opts)
+ request(method, url, headers, fun, opts)
end
- defp request(method, scheme, host, port, path, headers, body, opts) when is_function(body) do
- with {:ok, conn} <- HTTP.connect(String.to_atom(scheme), host, port, opts),
- # FIXME Stream function in Mint will not append the content length after eof
- # This will trigger the failure in unit test
- {:ok, body, length} <- stream_request(body),
- {:ok, conn, _req_ref} <-
- HTTP.request(
- conn,
- method,
- path || "/",
- headers ++ [{"content-length", "#{length}"}],
- body
- ),
- {:ok, conn, res = %{status: status, headers: headers}} <- stream_response(conn, opts),
- {:ok, _conn} <- HTTP.close(conn) do
- {:ok, status, headers, Map.get(res, :data)}
+ defp request(method, url, headers, body, opts),
+ do: do_request(method, url, headers, body, opts)
+
+ defp do_request(method, url, headers, body, opts) do
+ with uri <- URI.parse(url),
+ path <- prepare_path(uri.path, uri.query),
+ opts <- check_original(uri, opts),
+ {:ok, conn, opts} <- open_conn(uri, opts),
+ {:ok, conn, ref} <- make_request(conn, method, path, headers, body) do
+ format_response(conn, ref, opts)
end
end
- defp request(method, scheme, host, port, path, headers, body, opts) do
- with {:ok, conn} <- HTTP.connect(String.to_atom(scheme), host, port, opts),
- {:ok, conn, _req_ref} <- HTTP.request(conn, method, path || "/", headers, body),
- {:ok, conn, res = %{status: status, headers: headers}} <- stream_response(conn, opts),
- {:ok, _conn} <- HTTP.close(conn) do
- {:ok, status, headers, Map.get(res, :data)}
+ defp check_original(uri, %{original: original} = opts) do
+ Map.put(opts, :original_matches, original == "#{uri.host}:#{uri.port}")
+ end
+
+ defp check_original(_uri, opts), do: opts
+
+ defp open_conn(_uri, %{conn: conn, original_matches: true} = opts) do
+ {:ok, conn, opts}
+ end
+
+ defp open_conn(uri, %{conn: conn, original_matches: false} = opts) do
+ opts =
+ opts
+ |> Map.put_new(:old_conn, conn)
+ |> Map.delete(:conn)
+
+ open_conn(uri, opts)
+ end
+
+ defp open_conn(uri, opts) do
+ opts =
+ with "https" <- uri.scheme,
+ true <- opts[:certificates_verification] do
+ Map.update(opts, :transport_opts, [verify: :verify_peer], fn t_opts ->
+ Keyword.put(t_opts, :verify, :verify_peer)
+ end)
+ else
+ _ -> opts
+ end
+
+ with {:ok, conn} <-
+ HTTP.connect(String.to_atom(uri.scheme), uri.host, uri.port, Enum.into(opts, [])) do
+ # If there were redirects, and passed `closed_conn: false`, we need to close opened connections to these intermediate hosts.
+ {:ok, conn, Map.put(opts, :close_conn, true)}
end
end
- defp get_global_default_ca() do
- case Application.get_env(:tesla, Tesla.Adapter.Mint) do
- nil -> nil
- env -> Keyword.get(env, :cacert)
+ defp make_request(conn, method, path, headers, body) when is_function(body) do
+ with {:ok, conn, ref} <-
+ HTTP.request(
+ conn,
+ method,
+ path,
+ headers,
+ :stream
+ ),
+ {:ok, conn} <- stream_request(conn, ref, body) do
+ {:ok, conn, ref}
end
end
- defp stream_request(fun, body \\ "") do
+ defp make_request(conn, method, path, headers, body),
+ do: HTTP.request(conn, method, path, headers, body)
+
+ defp stream_request(conn, ref, fun) do
case next_chunk(fun) do
{:ok, item, fun} when is_list(item) ->
- stream_request(fun, body <> List.to_string(item))
+ chunk = List.to_string(item)
+ {:ok, conn} = HTTP.stream_request_body(conn, ref, chunk)
+ stream_request(conn, ref, fun)
{:ok, item, fun} ->
- stream_request(fun, body <> item)
+ {:ok, conn} = HTTP.stream_request_body(conn, ref, item)
+ stream_request(conn, ref, fun)
:eof ->
- {:ok, body, byte_size(body)}
+ HTTP.stream_request_body(conn, ref, :eof)
end
end
- defp stream_response(conn, opts, response \\ %{}) do
- receive do
- msg ->
- case HTTP.stream(conn, msg) do
- {:ok, conn, stream} ->
- response =
- Enum.reduce(stream, response, fn
- {:status, _req_ref, code}, acc ->
- Map.put(acc, :status, code)
-
- {:headers, _req_ref, headers}, acc ->
- Map.put(acc, :headers, Map.get(acc, :headers, []) ++ headers)
+ defp format_response(conn, ref, %{body_as: :plain} = opts) do
+ with {:ok, response} <- receive_responses(conn, ref, opts) do
+ {:ok, response[:status], response[:headers], response[:data]}
+ end
+ end
- {:data, _req_ref, data}, acc ->
- Map.put(acc, :data, Map.get(acc, :data, "") <> data)
+ defp format_response(conn, ref, %{body_as: :chunks} = opts) do
+ with {:ok, conn, %{status: status, headers: headers} = acc} <-
+ receive_headers_and_status(conn, ref, opts),
+ {state, data} <-
+ response_state(acc) do
+ {:ok, conn} =
+ if state == :fin and opts[:close_conn] do
+ close(conn)
+ else
+ {:ok, conn}
+ end
- {:done, _req_ref}, acc ->
- Map.put(acc, :done, true)
+ {:ok, status, headers, %{conn: conn, ref: ref, opts: opts, body: {state, data}}}
+ end
+ end
- {:error, _req_ref, reason}, acc ->
- Map.put(acc, :error, reason)
+ defp format_response(conn, ref, %{body_as: :stream} = opts) do
+ # there can be some data already
+ with {:ok, conn, %{status: status, headers: headers} = acc} <-
+ receive_headers_and_status(conn, ref, opts) do
+ body_as_stream =
+ Stream.resource(
+ fn -> %{conn: conn, data: acc[:data], done: acc[:done]} end,
+ fn
+ %{conn: conn, data: data, done: true} ->
+ {[data], %{conn: conn, is_fin: true}}
+
+ %{conn: conn, data: data} when is_binary(data) ->
+ {[data], %{conn: conn}}
+
+ %{conn: conn, is_fin: true} ->
+ {:halt, %{conn: conn}}
+
+ %{conn: conn} ->
+ case receive_packet(conn, ref, opts) do
+ {:ok, conn, %{done: true, data: data}} ->
+ {[data], %{conn: conn, is_fin: true}}
+
+ {:ok, conn, %{done: true}} ->
+ {[], %{conn: conn, is_fin: true}}
+
+ {:ok, conn, %{data: data}} ->
+ {[data], %{conn: conn}}
+
+ {:ok, conn, _} ->
+ {[], %{conn: conn}}
+ end
+ end,
+ fn %{conn: conn} -> if opts[:close_conn], do: {:ok, _conn} = close(conn) end
+ )
+
+ {:ok, status, headers, body_as_stream}
+ end
+ end
- _, acc ->
- acc
- end)
+ defp receive_responses(conn, ref, opts, acc \\ %{}) do
+ with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc),
+ :ok <- check_data_size(acc, conn, opts) do
+ if acc[:done] do
+ if opts[:close_conn], do: {:ok, _conn} = close(conn)
+ {:ok, acc}
+ else
+ receive_responses(conn, ref, opts, acc)
+ end
+ end
+ end
- cond do
- Map.has_key?(response, :error) ->
- {:error, Map.get(response, :error)}
+ defp check_data_size(%{data: data}, conn, %{max_body: max_body} = opts)
+ when is_binary(data) do
+ if max_body - byte_size(data) >= 0 do
+ :ok
+ else
+ if opts[:close_conn], do: {:ok, _conn} = close(conn)
+ {:error, :body_too_large}
+ end
+ end
- Map.has_key?(response, :done) ->
- {:ok, conn, Map.drop(response, [:done])}
+ defp check_data_size(_, _, _), do: :ok
- true ->
- stream_response(conn, opts, response)
- end
+ defp receive_headers_and_status(conn, ref, opts, acc \\ %{}) do
+ with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do
+ case acc do
+ %{status: _status, headers: _headers} -> {:ok, conn, acc}
+ # if we don't have status or headers we try to get them in next packet
+ _ -> receive_headers_and_status(conn, ref, opts, acc)
+ end
+ end
+ end
- {:error, _conn, error, _res} ->
- {:error, "Encounter Mint error #{inspect(error)}"}
+ defp response_state(%{done: true, data: data}), do: {:fin, data}
+ defp response_state(%{data: data}), do: {:nofin, data}
+ defp response_state(%{done: true}), do: {:fin, ""}
+ defp response_state(_), do: {:nofin, ""}
+
+ defp receive_packet(conn, ref, opts, acc \\ %{}) do
+ with {:ok, conn, responses} <- receive_message(conn, opts),
+ acc <- reduce_responses(responses, ref, acc) do
+ {:ok, conn, acc}
+ else
+ {:error, error} ->
+ if opts[:close_conn], do: {:ok, _conn} = close(conn)
+ {:error, error}
+
+ {:error, _conn, error, _res} ->
+ if opts[:close_conn], do: {:ok, _conn} = close(conn)
+ {:error, "Encounter Mint error #{inspect(error)}"}
+
+ :unknown ->
+ if opts[:close_conn], do: {:ok, _conn} = close(conn)
+ {:error, :unknown}
+ end
+ end
- :unknown ->
- {:error, "Encounter unknown error"}
- end
+ defp receive_message(conn, %{mode: :active} = opts) do
+ receive do
+ message ->
+ HTTP.stream(conn, message)
after
- opts |> Keyword.get(:adapter) |> Keyword.get(:timeout) ->
- {:error, "Response timeout"}
+ opts[:timeout] -> {:error, :timeout}
end
end
+
+ defp receive_message(conn, %{mode: :passive} = opts),
+ do: HTTP.recv(conn, 0, opts[:timeout])
+
+ defp reduce_responses(responses, ref, acc) do
+ Enum.reduce(responses, acc, fn
+ {:status, ^ref, code}, acc ->
+ Map.put(acc, :status, code)
+
+ {:headers, ^ref, headers}, acc ->
+ Map.update(acc, :headers, headers, &(&1 ++ headers))
+
+ {:data, ^ref, data}, acc ->
+ Map.update(acc, :data, data, &(&1 <> data))
+
+ {:done, ^ref}, acc ->
+ Map.put(acc, :done, true)
+ end)
+ end
end
end
diff --git a/test/tesla/adapter/mint_test.exs b/test/tesla/adapter/mint_test.exs
index 8d92233..27094dc 100644
--- a/test/tesla/adapter/mint_test.exs
+++ b/test/tesla/adapter/mint_test.exs
@@ -1,19 +1,286 @@
defmodule Tesla.Adapter.MintTest do
use ExUnit.Case
use Tesla.AdapterCase, adapter: Tesla.Adapter.Mint
use Tesla.AdapterCase.Basic
use Tesla.AdapterCase.Multipart
use Tesla.AdapterCase.StreamRequestBody
- # TODO: Disabled temporarily
- # use Tesla.AdapterCase.SSL
+
+ use Tesla.AdapterCase.SSL,
+ transport_opts: [cacertfile: "./deps/httparrot/priv/ssl/server-ca.crt"]
test "Delay request" do
request = %Env{
method: :head,
url: "#{@http}/delay/1"
}
- assert {:error, "Response timeout"} = call(request, adapter: [timeout: 100])
+ assert {:error, :timeout} = call(request, timeout: 100)
+ end
+
+ test "max_body option" do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/100"
+ }
+
+ assert {:error, :body_too_large} = call(request, max_body: 5)
+ end
+
+ test "response body as stream" do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/1500"
+ }
+
+ assert {:ok, %Env{} = response} = call(request, body_as: :stream)
+ assert response.status == 200
+ assert is_function(response.body)
+ assert Enum.join(response.body) |> byte_size() == 2245
+ end
+
+ test "response body as chunks with closing body with default" do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/10"
+ }
+
+ assert {:ok, %Env{} = response} = call(request, body_as: :chunks)
+ assert response.status == 200
+ %{conn: conn, ref: ref, opts: opts, body: body} = response.body
+ assert opts[:body_as] == :chunks
+
+ {:ok, conn, received_body} = read_body(conn, ref, opts, body)
+ assert byte_size(received_body) == 16
+
+ assert conn.state == :closed
+ end
+
+ test "certificates_verification" do
+ request = %Env{
+ method: :get,
+ url: "#{@https}/stream-bytes/10"
+ }
+
+ assert {:ok, %Env{} = response} =
+ call(request,
+ certificates_verification: true,
+ transport_opts: [
+ verify_fun:
+ {fn
+ _cert, _reason, state ->
+ {:valid, state}
+ end, nil}
+ ]
+ )
+
+ assert response.status == 200
+ assert byte_size(response.body) == 16
+ end
+
+ describe "mode: :passive" do
+ test "body_as: :plain" do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/10"
+ }
+
+ assert {:ok, %Env{} = response} = call(request, mode: :passive)
+ assert response.status == 200
+ assert byte_size(response.body) == 16
+ end
+
+ test "body_as: :stream" do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/10"
+ }
+
+ assert {:ok, %Env{} = response} = call(request, body_as: :stream, mode: :passive)
+ assert response.status == 200
+ assert Enum.join(response.body) |> byte_size() == 16
+ end
+
+ test "body_as: :chunks" do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/10"
+ }
+
+ assert {:ok, %Env{} = response} = call(request, body_as: :chunks, mode: :passive)
+ assert response.status == 200
+ %{conn: conn, ref: ref, opts: opts, body: body} = response.body
+
+ {:ok, _conn, received_body} = read_body(conn, ref, opts, body)
+ assert byte_size(received_body) == 16
+ end
+ end
+
+ describe "500 error" do
+ test "body_as :plain" do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/status/500"
+ }
+
+ assert {:ok, %Env{} = response} = call(request)
+ assert response.status == 500
+ end
+
+ test "body_as :stream" do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/status/500"
+ }
+
+ assert {:ok, %Env{} = response} = call(request, body_as: :stream)
+ assert response.status == 500
+ end
+
+ test "body_as :chunks" do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/status/500"
+ }
+
+ assert {:ok, %Env{} = response} = call(request, body_as: :chunks)
+ assert response.status == 500
+ end
+ end
+
+ describe "reusing connection" do
+ setup do
+ uri = URI.parse(@http)
+ {:ok, conn} = Mint.HTTP.connect(:http, uri.host, uri.port)
+ {:ok, conn: conn, original: "#{uri.host}:#{uri.port}"}
+ end
+
+ test "body_as :plain", %{conn: conn, original: original} do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/10"
+ }
+
+ assert {:ok, %Env{} = response} =
+ call(request, conn: conn, original: original, close_conn: false)
+
+ assert response.status == 200
+ assert byte_size(response.body) == 16
+
+ assert {:ok, %Env{} = response} =
+ call(request, conn: conn, original: original, close_conn: false)
+
+ assert response.status == 200
+ assert byte_size(response.body) == 16
+
+ assert {:ok, conn} = Tesla.Adapter.Mint.close(conn)
+ assert conn.state == :closed
+ end
+
+ test "body_as :stream", %{conn: conn, original: original} do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/10"
+ }
+
+ assert {:ok, %Env{} = response} =
+ call(request,
+ conn: conn,
+ original: original,
+ close_conn: false,
+ body_as: :stream
+ )
+
+ assert response.status == 200
+ assert is_function(response.body)
+ assert Enum.join(response.body) |> byte_size() == 16
+
+ assert {:ok, %Env{} = response} =
+ call(request,
+ conn: conn,
+ original: original,
+ close_conn: false,
+ body_as: :stream
+ )
+
+ assert response.status == 200
+ assert is_function(response.body)
+ assert Enum.join(response.body) |> byte_size() == 16
+
+ assert {:ok, conn} = Tesla.Adapter.Mint.close(conn)
+ assert conn.state == :closed
+ end
+
+ test "body_as :chunks", %{conn: conn, original: original} do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/10"
+ }
+
+ assert {:ok, %Env{} = response} =
+ call(request,
+ conn: conn,
+ original: original,
+ close_conn: false,
+ body_as: :chunks
+ )
+
+ assert response.status == 200
+ assert %{conn: received_conn, ref: ref, opts: opts, body: body} = response.body
+ {:ok, conn, received_body} = read_body(received_conn, ref, opts, body)
+ assert byte_size(received_body) == 16
+ assert conn.socket == received_conn.socket
+
+ assert {:ok, %Env{} = response} =
+ call(request,
+ conn: conn,
+ original: original,
+ close_conn: false,
+ body_as: :chunks
+ )
+
+ assert response.status == 200
+ assert %{conn: received_conn, ref: ref, opts: opts, body: body} = response.body
+ {:ok, conn, received_body} = read_body(received_conn, ref, opts, body)
+ assert byte_size(received_body) == 16
+ assert conn.socket == received_conn.socket
+
+ {:ok, conn} = Tesla.Adapter.Mint.close(received_conn)
+ assert conn.state == :closed
+ end
+
+ test "don't reuse connection if original does not match", %{conn: conn} do
+ request = %Env{
+ method: :get,
+ url: "#{@http}/stream-bytes/10"
+ }
+
+ assert {:ok, %Env{} = response} =
+ call(request, body_as: :chunks, conn: conn, original: "example.com:80")
+
+ assert response.status == 200
+ %{conn: received_conn, ref: ref, opts: opts, body: body} = response.body
+
+ {:ok, received_conn, received_body} = read_body(received_conn, ref, opts, body)
+ assert byte_size(received_body) == 16
+ refute conn.socket == received_conn.socket
+ refute opts[:conn]
+ assert opts[:old_conn].socket == conn.socket
+ end
+ end
+
+ def read_body(conn, _ref, _opts, {:fin, body}), do: {:ok, conn, body}
+
+ def read_body(conn, ref, opts, {:nofin, acc}),
+ do: read_body(conn, ref, opts, acc)
+
+ def read_body(conn, ref, opts, acc) do
+ case Tesla.Adapter.Mint.read_chunk(conn, ref, opts) do
+ {:fin, conn, body} ->
+ {:ok, conn, acc <> body}
+
+ {:nofin, conn, part} ->
+ read_body(conn, ref, opts, acc <> part)
+ end
end
end

File Metadata

Mime Type
text/x-diff
Expires
Tue, Nov 26, 12:49 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40150
Default Alt Text
(25 KB)

Event Timeline