Page MenuHomePhorge

No OneTemporary

Size
21 KB
Referenced Files
None
Subscribers
None
diff --git a/config/config.exs b/config/config.exs
index 2c43344..9d67bb7 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -1,24 +1,23 @@
use Mix.Config
config :tesla, adapter: Tesla.Adapter.Httpc
if Mix.env() == :test do
config :logger, :console,
level: :debug,
format: "$date $time [$level] $metadata$message\n"
config :httparrot,
http_port: 5080,
https_port: 5443,
ssl: true,
unix_socket: false
config :sasl,
errlog_type: :error,
sasl_error_logger: false
config :tesla, MockClient, adapter: Tesla.Mock
- config :tesla, Tesla.Adapter.Mint,
- cacert: ["./deps/httparrot/priv/ssl/server-ca.crt"]
+ config :tesla, Tesla.Adapter.Mint, cacert: ["./deps/httparrot/priv/ssl/server-ca.crt"]
end
diff --git a/lib/tesla/adapter/mint.ex b/lib/tesla/adapter/mint.ex
index 51c4b5f..1ba8538 100644
--- a/lib/tesla/adapter/mint.ex
+++ b/lib/tesla/adapter/mint.ex
@@ -1,321 +1,332 @@
if Code.ensure_loaded?(Mint.HTTP) do
defmodule Tesla.Adapter.Mint do
@moduledoc """
Adapter for [mint](https://github.com/ericmj/mint)
Caution: The minimum supported Elixir version for mint is 1.5.0
Remember to add `{:mint, "~> 1.0"}` and `{:castore, "~> 0.1"}` to dependencies
Also, you need to recompile tesla after adding `:mint` dependency:
```
mix deps.clean tesla
mix deps.compile tesla
```
### Example usage
```
# set globally in config/config.exs
config :tesla, :adapter, Tesla.Adapter.Mint
# set per module
defmodule MyClient do
use Tesla
adapter Tesla.Adapter.Mint
end
```
### Adapter specific options:
* `timeout` - Time, while process, will wait for mint messages.
* `body_as` - What will be returned in `%Tesla.Env{}` body key. Possible values - `:plain`, `:stream`, `:chunks`. Defaults to `:plain`.
* `:plain` - as binary.
* `:stream` - as stream. If you don't want to close connection (because you want to reuse it later) pass `close_conn: false` in adapter opts.
* `:chunks` - as chunks. You can get response body in chunks using `Tesla.Adapter.Mint.read_chunk/3` function.
Processing of the chunks and checking body size must be done by yourself. Example of processing function is in `test/tesla/adapter/mint_test.exs` - `Tesla.Adapter.MintTest.read_body/4`. If you don't need connection later don't forget to close it with `Tesla.Adapter.Mint.close/1`.
* `max_body` - Max response body size in bytes. Works only with `body_as: :plain`, with other settings you need to check response body size by yourself.
* `conn` - Opened connection with mint. Is used for reusing mint connections.
* `original` - Original host with port, for which reused connection was open. Needed for `Tesla.Middleware.FollowRedirects`. Otherwise adapter will use connection for another open host.
* `close_conn` - Close connection or not after receiving full response body. Is used for reusing mint connections. Defaults to `true`.
* `proxy` - Proxy settings. E.g.: `{:http, "localhost", 8888, []}`, `{:http, "127.0.0.1", 8888, []}`
"""
@behaviour Tesla.Adapter
import Tesla.Adapter.Shared
alias Tesla.Multipart
alias Mint.HTTP
@default timeout: 2_000, body_as: :plain, close_conn: true, mode: :active
@impl true
def call(env, opts) do
opts = Tesla.Adapter.opts(@default, env, opts)
with {:ok, status, headers, body} <- request(env, opts) do
{:ok, %{env | status: status, headers: headers, body: body}}
end
end
@doc """
Reads chunk of the response body.
Returns `{:fin, HTTP.t(), binary()}` if all body received, otherwise returns `{:nofin, HTTP.t(), binary()}`.
"""
@spec read_chunk(HTTP.t(), reference(), keyword()) ::
{:fin, HTTP.t(), binary()} | {:nofin, HTTP.t(), binary()}
def read_chunk(conn, ref, opts) do
with {:ok, conn, acc} <- receive_packet(conn, ref, opts),
{state, data} <- response_state(acc) do
{:ok, conn} =
if state == :fin and opts[:close_conn] do
close(conn)
else
{:ok, conn}
end
{state, conn, data}
end
end
@doc """
Closes mint connection.
"""
@spec close(HTTP.t()) :: {:ok, HTTP.t()}
defdelegate close(conn), to: HTTP
defp request(env, opts) do
request(
format_method(env.method),
Tesla.build_url(env.url, env.query),
env.headers,
env.body,
Enum.into(opts, %{})
)
end
defp request(method, url, headers, %Stream{} = body, opts) do
fun = stream_to_fun(body)
request(method, url, headers, fun, opts)
end
defp request(method, url, headers, %Multipart{} = body, opts) do
headers = headers ++ Multipart.headers(body)
fun = stream_to_fun(Multipart.body(body))
request(method, url, headers, fun, opts)
end
defp request(method, url, headers, body, opts),
do: do_request(method, url, headers, body, opts)
defp do_request(method, url, headers, body, opts) do
with uri <- URI.parse(url),
path <- prepare_path(uri.path, uri.query),
opts <- check_original(uri, opts),
{:ok, conn, opts} <- open_conn(uri, opts),
{:ok, conn, ref} <- make_request(conn, method, path, headers, body) do
format_response(conn, ref, opts)
end
end
defp check_original(uri, %{original: original} = opts) do
Map.put(opts, :original_matches, original == "#{uri.host}:#{uri.port}")
end
defp check_original(_uri, opts), do: opts
defp open_conn(_uri, %{conn: conn, original_matches: true} = opts) do
{:ok, conn, opts}
end
defp open_conn(uri, %{conn: conn, original_matches: false} = opts) do
opts =
opts
|> Map.put_new(:old_conn, conn)
|> Map.delete(:conn)
open_conn(uri, opts)
end
defp open_conn(uri, opts) do
+ opts =
+ with "https" <- uri.scheme,
+ global_cacertfile when not is_nil(global_cacertfile) <-
+ Application.get_env(:tesla, Tesla.Adapter.Mint)[:cacert] do
+ Map.update(opts, :transport_opts, [cacertfile: global_cacertfile], fn tr_opts ->
+ Keyword.put_new(tr_opts, :cacertfile, global_cacertfile)
+ end)
+ else
+ _ -> opts
+ end
+
with {:ok, conn} <-
HTTP.connect(String.to_atom(uri.scheme), uri.host, uri.port, Enum.into(opts, [])) do
# If there were redirects, and passed `closed_conn: false`, we need to close opened connections to these intermediate hosts.
{:ok, conn, Map.put(opts, :close_conn, true)}
end
end
defp make_request(conn, method, path, headers, body) when is_function(body) do
with {:ok, conn, ref} <-
HTTP.request(
conn,
method,
path,
headers,
:stream
),
{:ok, conn} <- stream_request(conn, ref, body) do
{:ok, conn, ref}
end
end
defp make_request(conn, method, path, headers, body),
do: HTTP.request(conn, method, path, headers, body)
defp stream_request(conn, ref, fun) do
case next_chunk(fun) do
{:ok, item, fun} when is_list(item) ->
chunk = List.to_string(item)
{:ok, conn} = HTTP.stream_request_body(conn, ref, chunk)
stream_request(conn, ref, fun)
{:ok, item, fun} ->
{:ok, conn} = HTTP.stream_request_body(conn, ref, item)
stream_request(conn, ref, fun)
:eof ->
HTTP.stream_request_body(conn, ref, :eof)
end
end
defp format_response(conn, ref, %{body_as: :plain} = opts) do
with {:ok, response} <- receive_responses(conn, ref, opts) do
{:ok, response[:status], response[:headers], response[:data]}
end
end
defp format_response(conn, ref, %{body_as: :chunks} = opts) do
with {:ok, conn, %{status: status, headers: headers} = acc} <-
receive_headers_and_status(conn, ref, opts),
{state, data} <-
response_state(acc) do
{:ok, conn} =
if state == :fin and opts[:close_conn] do
close(conn)
else
{:ok, conn}
end
{:ok, status, headers, %{conn: conn, ref: ref, opts: opts, body: {state, data}}}
end
end
defp format_response(conn, ref, %{body_as: :stream} = opts) do
# there can be some data already
with {:ok, conn, %{status: status, headers: headers} = acc} <-
receive_headers_and_status(conn, ref, opts) do
body_as_stream =
Stream.resource(
fn -> %{conn: conn, data: acc[:data], done: acc[:done]} end,
fn
%{conn: conn, data: data, done: true} ->
{[data], %{conn: conn, is_fin: true}}
%{conn: conn, data: data} when is_binary(data) ->
{[data], %{conn: conn}}
%{conn: conn, is_fin: true} ->
{:halt, %{conn: conn}}
%{conn: conn} ->
case receive_packet(conn, ref, opts) do
{:ok, conn, %{done: true, data: data}} ->
{[data], %{conn: conn, is_fin: true}}
{:ok, conn, %{done: true}} ->
{[], %{conn: conn, is_fin: true}}
{:ok, conn, %{data: data}} ->
{[data], %{conn: conn}}
{:ok, conn, _} ->
{[], %{conn: conn}}
end
end,
fn %{conn: conn} -> if opts[:close_conn], do: {:ok, _conn} = close(conn) end
)
{:ok, status, headers, body_as_stream}
end
end
defp receive_responses(conn, ref, opts, acc \\ %{}) do
with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc),
:ok <- check_data_size(acc, conn, opts) do
if acc[:done] do
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:ok, acc}
else
receive_responses(conn, ref, opts, acc)
end
end
end
defp check_data_size(%{data: data}, conn, %{max_body: max_body} = opts)
when is_binary(data) do
if max_body - byte_size(data) >= 0 do
:ok
else
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:error, :body_too_large}
end
end
defp check_data_size(_, _, _), do: :ok
defp receive_headers_and_status(conn, ref, opts, acc \\ %{}) do
with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do
case acc do
%{status: _status, headers: _headers} -> {:ok, conn, acc}
# if we don't have status or headers we try to get them in next packet
_ -> receive_headers_and_status(conn, ref, opts, acc)
end
end
end
defp response_state(%{done: true, data: data}), do: {:fin, data}
defp response_state(%{data: data}), do: {:nofin, data}
defp response_state(%{done: true}), do: {:fin, ""}
defp response_state(_), do: {:nofin, ""}
defp receive_packet(conn, ref, opts, acc \\ %{}) do
with {:ok, conn, responses} <- receive_message(conn, opts),
acc <- reduce_responses(responses, ref, acc) do
{:ok, conn, acc}
else
{:error, error} ->
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:error, error}
{:error, _conn, error, _res} ->
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:error, "Encounter Mint error #{inspect(error)}"}
:unknown ->
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:error, :unknown}
end
end
defp receive_message(conn, %{mode: :active} = opts) do
receive do
message ->
HTTP.stream(conn, message)
after
opts[:timeout] -> {:error, :timeout}
end
end
defp receive_message(conn, %{mode: :passive} = opts),
do: HTTP.recv(conn, 0, opts[:timeout])
defp reduce_responses(responses, ref, acc) do
Enum.reduce(responses, acc, fn
{:status, ^ref, code}, acc ->
Map.put(acc, :status, code)
{:headers, ^ref, headers}, acc ->
Map.update(acc, :headers, headers, &(&1 ++ headers))
{:data, ^ref, data}, acc ->
Map.update(acc, :data, data, &(&1 <> data))
{:done, ^ref}, acc ->
Map.put(acc, :done, true)
end)
end
end
end
diff --git a/test/tesla/adapter/mint_test.exs b/test/tesla/adapter/mint_test.exs
index 27094dc..f05debd 100644
--- a/test/tesla/adapter/mint_test.exs
+++ b/test/tesla/adapter/mint_test.exs
@@ -1,286 +1,285 @@
defmodule Tesla.Adapter.MintTest do
use ExUnit.Case
use Tesla.AdapterCase, adapter: Tesla.Adapter.Mint
use Tesla.AdapterCase.Basic
use Tesla.AdapterCase.Multipart
use Tesla.AdapterCase.StreamRequestBody
- use Tesla.AdapterCase.SSL,
- transport_opts: [cacertfile: "./deps/httparrot/priv/ssl/server-ca.crt"]
+ use Tesla.AdapterCase.SSL
test "Delay request" do
request = %Env{
method: :head,
url: "#{@http}/delay/1"
}
assert {:error, :timeout} = call(request, timeout: 100)
end
test "max_body option" do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/100"
}
assert {:error, :body_too_large} = call(request, max_body: 5)
end
test "response body as stream" do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/1500"
}
assert {:ok, %Env{} = response} = call(request, body_as: :stream)
assert response.status == 200
assert is_function(response.body)
assert Enum.join(response.body) |> byte_size() == 2245
end
test "response body as chunks with closing body with default" do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/10"
}
assert {:ok, %Env{} = response} = call(request, body_as: :chunks)
assert response.status == 200
%{conn: conn, ref: ref, opts: opts, body: body} = response.body
assert opts[:body_as] == :chunks
{:ok, conn, received_body} = read_body(conn, ref, opts, body)
assert byte_size(received_body) == 16
assert conn.state == :closed
end
test "certificates_verification" do
request = %Env{
method: :get,
url: "#{@https}/stream-bytes/10"
}
assert {:ok, %Env{} = response} =
call(request,
certificates_verification: true,
transport_opts: [
verify_fun:
{fn
_cert, _reason, state ->
{:valid, state}
end, nil}
]
)
assert response.status == 200
assert byte_size(response.body) == 16
end
describe "mode: :passive" do
test "body_as: :plain" do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/10"
}
assert {:ok, %Env{} = response} = call(request, mode: :passive)
assert response.status == 200
assert byte_size(response.body) == 16
end
test "body_as: :stream" do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/10"
}
assert {:ok, %Env{} = response} = call(request, body_as: :stream, mode: :passive)
assert response.status == 200
assert Enum.join(response.body) |> byte_size() == 16
end
test "body_as: :chunks" do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/10"
}
assert {:ok, %Env{} = response} = call(request, body_as: :chunks, mode: :passive)
assert response.status == 200
%{conn: conn, ref: ref, opts: opts, body: body} = response.body
{:ok, _conn, received_body} = read_body(conn, ref, opts, body)
assert byte_size(received_body) == 16
end
end
describe "500 error" do
test "body_as :plain" do
request = %Env{
method: :get,
url: "#{@http}/status/500"
}
assert {:ok, %Env{} = response} = call(request)
assert response.status == 500
end
test "body_as :stream" do
request = %Env{
method: :get,
url: "#{@http}/status/500"
}
assert {:ok, %Env{} = response} = call(request, body_as: :stream)
assert response.status == 500
end
test "body_as :chunks" do
request = %Env{
method: :get,
url: "#{@http}/status/500"
}
assert {:ok, %Env{} = response} = call(request, body_as: :chunks)
assert response.status == 500
end
end
describe "reusing connection" do
setup do
uri = URI.parse(@http)
{:ok, conn} = Mint.HTTP.connect(:http, uri.host, uri.port)
{:ok, conn: conn, original: "#{uri.host}:#{uri.port}"}
end
test "body_as :plain", %{conn: conn, original: original} do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/10"
}
assert {:ok, %Env{} = response} =
call(request, conn: conn, original: original, close_conn: false)
assert response.status == 200
assert byte_size(response.body) == 16
assert {:ok, %Env{} = response} =
call(request, conn: conn, original: original, close_conn: false)
assert response.status == 200
assert byte_size(response.body) == 16
assert {:ok, conn} = Tesla.Adapter.Mint.close(conn)
assert conn.state == :closed
end
test "body_as :stream", %{conn: conn, original: original} do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/10"
}
assert {:ok, %Env{} = response} =
call(request,
conn: conn,
original: original,
close_conn: false,
body_as: :stream
)
assert response.status == 200
assert is_function(response.body)
assert Enum.join(response.body) |> byte_size() == 16
assert {:ok, %Env{} = response} =
call(request,
conn: conn,
original: original,
close_conn: false,
body_as: :stream
)
assert response.status == 200
assert is_function(response.body)
assert Enum.join(response.body) |> byte_size() == 16
assert {:ok, conn} = Tesla.Adapter.Mint.close(conn)
assert conn.state == :closed
end
test "body_as :chunks", %{conn: conn, original: original} do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/10"
}
assert {:ok, %Env{} = response} =
call(request,
conn: conn,
original: original,
close_conn: false,
body_as: :chunks
)
assert response.status == 200
assert %{conn: received_conn, ref: ref, opts: opts, body: body} = response.body
{:ok, conn, received_body} = read_body(received_conn, ref, opts, body)
assert byte_size(received_body) == 16
assert conn.socket == received_conn.socket
assert {:ok, %Env{} = response} =
call(request,
conn: conn,
original: original,
close_conn: false,
body_as: :chunks
)
assert response.status == 200
assert %{conn: received_conn, ref: ref, opts: opts, body: body} = response.body
{:ok, conn, received_body} = read_body(received_conn, ref, opts, body)
assert byte_size(received_body) == 16
assert conn.socket == received_conn.socket
{:ok, conn} = Tesla.Adapter.Mint.close(received_conn)
assert conn.state == :closed
end
test "don't reuse connection if original does not match", %{conn: conn} do
request = %Env{
method: :get,
url: "#{@http}/stream-bytes/10"
}
assert {:ok, %Env{} = response} =
call(request, body_as: :chunks, conn: conn, original: "example.com:80")
assert response.status == 200
%{conn: received_conn, ref: ref, opts: opts, body: body} = response.body
{:ok, received_conn, received_body} = read_body(received_conn, ref, opts, body)
assert byte_size(received_body) == 16
refute conn.socket == received_conn.socket
refute opts[:conn]
assert opts[:old_conn].socket == conn.socket
end
end
def read_body(conn, _ref, _opts, {:fin, body}), do: {:ok, conn, body}
def read_body(conn, ref, opts, {:nofin, acc}),
do: read_body(conn, ref, opts, acc)
def read_body(conn, ref, opts, acc) do
case Tesla.Adapter.Mint.read_chunk(conn, ref, opts) do
{:fin, conn, body} ->
{:ok, conn, acc <> body}
{:nofin, conn, part} ->
read_body(conn, ref, opts, acc <> part)
end
end
end

File Metadata

Mime Type
text/x-diff
Expires
Tue, Nov 26, 12:58 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40152
Default Alt Text
(21 KB)

Event Timeline