Page MenuHomePhorge

No OneTemporary

Size
12 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/tesla/adapter/mint.ex b/lib/tesla/adapter/mint.ex
index c351a4c..92866d4 100644
--- a/lib/tesla/adapter/mint.ex
+++ b/lib/tesla/adapter/mint.ex
@@ -1,341 +1,346 @@
if Code.ensure_loaded?(Mint.HTTP) do
defmodule Tesla.Adapter.Mint do
@moduledoc """
Adapter for [mint](https://github.com/elixir-mint/mint).
Caution: The minimum supported Elixir version for mint is 1.5.0
Remember to add `{:mint, "~> 1.0"}` and `{:castore, "~> 0.1"}` to dependencies.
Also, you need to recompile tesla after adding `:mint` dependency:
```
mix deps.clean tesla
mix deps.compile tesla
```
## Example usage
```
# set globally in config/config.exs
config :tesla, :adapter, Tesla.Adapter.Mint
# set per module
defmodule MyClient do
use Tesla
adapter Tesla.Adapter.Mint
end
# set global custom cacertfile
config :tesla, Tesla.Adapter.Mint, cacert: ["path_to_cacert"]
```
## Adapter specific options:
- `:timeout` - Time in milliseconds, while process, will wait for mint messages. Defaults to `2_000`.
- `:body_as` - What will be returned in `%Tesla.Env{}` body key. Possible values - `:plain`, `:stream`, `:chunks`. Defaults to `:plain`.
- `:plain` - as binary.
- `:stream` - as stream. If you don't want to close connection (because you want to reuse it later) pass `close_conn: false` in adapter opts.
- `:chunks` - as chunks. You can get response body in chunks using `Tesla.Adapter.Mint.read_chunk/3` function.
Processing of the chunks and checking body size must be done by yourself. Example of processing function is in `test/tesla/adapter/mint_test.exs` - `Tesla.Adapter.MintTest.read_body/4`. If you don't need connection later don't forget to close it with `Tesla.Adapter.Mint.close/1`.
- `:max_body` - Max response body size in bytes. Works only with `body_as: :plain`, with other settings you need to check response body size by yourself.
- `:conn` - Opened connection with mint. Is used for reusing mint connections.
- `:original` - Original host with port, for which reused connection was open. Needed for `Tesla.Middleware.FollowRedirects`. Otherwise adapter will use connection for another open host.
- `:close_conn` - Close connection or not after receiving full response body. Is used for reusing mint connections. Defaults to `true`.
- `:proxy` - Proxy settings. E.g.: `{:http, "localhost", 8888, []}`, `{:http, "127.0.0.1", 8888, []}`
"""
@behaviour Tesla.Adapter
import Tesla.Adapter.Shared
alias Tesla.Multipart
alias Mint.HTTP
@default timeout: 2_000, body_as: :plain, close_conn: true, mode: :active
+ @tags [:tcp_error, :ssl_error, :tcp_closed, :ssl_closed, :tcp, :ssl]
+
@impl Tesla.Adapter
def call(env, opts) do
opts = Tesla.Adapter.opts(@default, env, opts)
with {:ok, status, headers, body} <- request(env, opts) do
{:ok, %{env | status: status, headers: headers, body: body}}
end
end
@doc """
Reads chunk of the response body.
Returns `{:fin, HTTP.t(), binary()}` if all body received, otherwise returns `{:nofin, HTTP.t(), binary()}`.
"""
@spec read_chunk(HTTP.t(), reference(), keyword()) ::
{:fin, HTTP.t(), binary()} | {:nofin, HTTP.t(), binary()}
def read_chunk(conn, ref, opts) do
with {:ok, conn, acc} <- receive_packet(conn, ref, Enum.into(opts, %{})),
{state, data} <- response_state(acc) do
{:ok, conn} =
if state == :fin and opts[:close_conn] do
close(conn)
else
{:ok, conn}
end
{state, conn, data}
end
end
@doc """
Closes mint connection.
"""
@spec close(HTTP.t()) :: {:ok, HTTP.t()}
defdelegate close(conn), to: HTTP
defp request(env, opts) do
request(
format_method(env.method),
Tesla.build_url(env.url, env.query),
env.headers,
env.body,
Enum.into(opts, %{})
)
end
defp request(method, url, headers, %Stream{} = body, opts) do
fun = stream_to_fun(body)
request(method, url, headers, fun, opts)
end
defp request(method, url, headers, %Multipart{} = body, opts) do
headers = headers ++ Multipart.headers(body)
fun = stream_to_fun(Multipart.body(body))
request(method, url, headers, fun, opts)
end
defp request(method, url, headers, body, opts),
do: do_request(method, url, headers, body, opts)
defp do_request(method, url, headers, body, opts) do
with uri <- URI.parse(url),
path <- prepare_path(uri.path, uri.query),
opts <- check_original(uri, opts),
{:ok, conn, opts} <- open_conn(uri, opts),
{:ok, conn, ref} <- make_request(conn, method, path, headers, body) do
format_response(conn, ref, opts)
end
end
defp check_original(uri, %{original: original} = opts) do
Map.put(opts, :original_matches, original == "#{uri.host}:#{uri.port}")
end
defp check_original(_uri, opts), do: opts
defp open_conn(_uri, %{conn: conn, original_matches: true} = opts) do
{:ok, conn, opts}
end
defp open_conn(uri, %{conn: conn, original_matches: false} = opts) do
opts =
opts
|> Map.put_new(:old_conn, conn)
|> Map.delete(:conn)
open_conn(uri, opts)
end
defp open_conn(uri, opts) do
opts =
with "https" <- uri.scheme,
global_cacertfile when not is_nil(global_cacertfile) <-
Application.get_env(:tesla, Tesla.Adapter.Mint)[:cacert] do
Map.update(opts, :transport_opts, [cacertfile: global_cacertfile], fn tr_opts ->
Keyword.put_new(tr_opts, :cacertfile, global_cacertfile)
end)
else
_ -> opts
end
with {:ok, conn} <-
HTTP.connect(String.to_atom(uri.scheme), uri.host, uri.port, Enum.into(opts, [])) do
# If there were redirects, and passed `closed_conn: false`, we need to close opened connections to these intermediate hosts.
{:ok, conn, Map.put(opts, :close_conn, true)}
end
end
defp make_request(conn, method, path, headers, body) when is_function(body) do
with {:ok, conn, ref} <-
HTTP.request(
conn,
method,
path,
headers,
:stream
),
{:ok, conn} <- stream_request(conn, ref, body) do
{:ok, conn, ref}
end
end
defp make_request(conn, method, path, headers, body),
do: HTTP.request(conn, method, path, headers, body)
defp stream_request(conn, ref, fun) do
case next_chunk(fun) do
{:ok, item, fun} when is_list(item) ->
chunk = List.to_string(item)
{:ok, conn} = HTTP.stream_request_body(conn, ref, chunk)
stream_request(conn, ref, fun)
{:ok, item, fun} ->
{:ok, conn} = HTTP.stream_request_body(conn, ref, item)
stream_request(conn, ref, fun)
:eof ->
HTTP.stream_request_body(conn, ref, :eof)
end
end
defp format_response(conn, ref, %{body_as: :plain} = opts) do
with {:ok, response} <- receive_responses(conn, ref, opts) do
{:ok, response[:status], response[:headers], response[:data]}
end
end
defp format_response(conn, ref, %{body_as: :chunks} = opts) do
with {:ok, conn, %{status: status, headers: headers} = acc} <-
receive_headers_and_status(conn, ref, opts),
{state, data} <-
response_state(acc) do
{:ok, conn} =
if state == :fin and opts[:close_conn] do
close(conn)
else
{:ok, conn}
end
{:ok, status, headers, %{conn: conn, ref: ref, opts: opts, body: {state, data}}}
end
end
defp format_response(conn, ref, %{body_as: :stream} = opts) do
# there can be some data already
with {:ok, conn, %{status: status, headers: headers} = acc} <-
receive_headers_and_status(conn, ref, opts) do
body_as_stream =
Stream.resource(
fn -> %{conn: conn, data: acc[:data], done: acc[:done]} end,
fn
%{conn: conn, data: data, done: true} ->
{[data], %{conn: conn, is_fin: true}}
%{conn: conn, data: data} when is_binary(data) ->
{[data], %{conn: conn}}
%{conn: conn, is_fin: true} ->
{:halt, %{conn: conn}}
%{conn: conn} ->
case receive_packet(conn, ref, opts) do
{:ok, conn, %{done: true, data: data}} ->
{[data], %{conn: conn, is_fin: true}}
{:ok, conn, %{done: true}} ->
{[], %{conn: conn, is_fin: true}}
{:ok, conn, %{data: data}} ->
{[data], %{conn: conn}}
{:ok, conn, _} ->
{[], %{conn: conn}}
end
end,
fn %{conn: conn} -> if opts[:close_conn], do: {:ok, _conn} = close(conn) end
)
{:ok, status, headers, body_as_stream}
end
end
defp receive_responses(conn, ref, opts, acc \\ %{}) do
with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc),
:ok <- check_data_size(acc, conn, opts) do
if acc[:done] do
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:ok, acc}
else
receive_responses(conn, ref, opts, acc)
end
end
end
defp check_data_size(%{data: data}, conn, %{max_body: max_body} = opts)
when is_binary(data) do
if max_body - byte_size(data) >= 0 do
:ok
else
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:error, :body_too_large}
end
end
defp check_data_size(_, _, _), do: :ok
defp receive_headers_and_status(conn, ref, opts, acc \\ %{}) do
with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do
case acc do
%{status: _status, headers: _headers} -> {:ok, conn, acc}
# if we don't have status or headers we try to get them in next packet
_ -> receive_headers_and_status(conn, ref, opts, acc)
end
end
end
defp response_state(%{done: true, data: data}), do: {:fin, data}
defp response_state(%{data: data}), do: {:nofin, data}
defp response_state(%{done: true}), do: {:fin, ""}
defp response_state(_), do: {:nofin, ""}
defp receive_packet(conn, ref, opts, acc \\ %{}) do
with {:ok, conn, responses} <- receive_message(conn, opts),
acc <- reduce_responses(responses, ref, acc) do
{:ok, conn, acc}
else
{:error, error} ->
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:error, error}
{:error, _conn, error, _res} ->
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:error, "Encounter Mint error #{inspect(error)}"}
:unknown ->
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:error, :unknown}
end
end
defp receive_message(conn, %{mode: :active} = opts) do
receive do
- message ->
+ {tag, _, _} = message when tag in @tags ->
+ HTTP.stream(conn, message)
+
+ {tag, _} = message when tag in @tags ->
HTTP.stream(conn, message)
after
opts[:timeout] -> {:error, :timeout}
end
end
defp receive_message(conn, %{mode: :passive} = opts),
do: HTTP.recv(conn, 0, opts[:timeout])
defp reduce_responses(responses, ref, acc) do
Enum.reduce(responses, acc, fn
{:status, ^ref, code}, acc ->
Map.put(acc, :status, code)
{:headers, ^ref, headers}, acc ->
Map.update(acc, :headers, headers, &(&1 ++ headers))
{:data, ^ref, data}, acc ->
Map.update(acc, :data, data, &(&1 <> data))
{:done, ^ref}, acc ->
Map.put(acc, :done, true)
end)
end
end
end

File Metadata

Mime Type
text/x-diff
Expires
Sun, Nov 24, 11:14 AM (22 h, 32 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
39410
Default Alt Text
(12 KB)

Event Timeline