Page MenuHomePhorge

No OneTemporary

Size
6 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/concurrent_limiter.ex b/lib/concurrent_limiter.ex
index 9776377..b4572c6 100644
--- a/lib/concurrent_limiter.ex
+++ b/lib/concurrent_limiter.ex
@@ -1,152 +1,116 @@
defmodule ConcurrentLimiter do
require Logger
@moduledoc """
# Concurrent Limiter
A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes.
- It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead. As it internally uses `persistent_term` to store metadata, and can fallback to ETS tables, it is however not made for a large number of limiters and cannot be used for things like a per-user rate limiter.
-
- It supports two storage methods:
-
- * **[atomics](https://erlang.org/doc/man/atomics.html)** recommended and default if your OTP is > 21.2.
- * **[ets](https://erlang.org/doc/man/ets.html)** either with a single table per limiter (faster) or a shared table.
-
- You would almost always want to use atomics, ets is mostly there for backwards compatibility.
+ It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead.
+ As it internally uses `persistent_term` to store metadata, and can fallback to ETS tables, it is however not made for a large number
+ of limiters and cannot be used for things like a per-user rate limiter.
"""
@doc """
Initializes a `ConcurrentLimiter`.
"""
+ @default_wait 150
+ @default_max_retries 5
+
@spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing} when name: atom(),
max_running: non_neg_integer(),
max_waiting: non_neg_integer() | :infinity,
options: [option],
- option: {:wait, non_neg_integer()} | backend,
- backend: :atomics | ets_backend,
- ets_backend: :ets | {:ets, atom()} | {:ets, atom(), ets_options :: []}
+ option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
def new(name, max_running, max_waiting, options \\ []) do
name = prefix_name(name)
if defined?(name) do
{:error, :existing}
else
- wait = Keyword.get(options, :wait, 150)
- backend = Keyword.get(options, :backend, default_backend())
- {:ok, backend} = setup_backend(backend)
- :persistent_term.put(name, {__MODULE__, max_running, max_waiting, backend, wait})
+ wait = Keyword.get(options, :wait, @default_wait)
+ max_retries = Keyword.get(options, :max_retries, @default_max_retries)
+ atomics = :atomics.new(1, [signed: true])
+ :persistent_term.put(name, {__MODULE__, max_running, max_waiting, atomics, wait, max_retries})
:ok
end
end
@spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error when name: atom(),
new_max_running: non_neg_integer(),
new_max_waiting: non_neg_integer() | :infinity,
options: [option],
option: {:wait, non_neg_integer()}
@doc "Adjust the limiter limits at runtime"
def set(name, new_max_running, new_max_waiting, options \\ []) do
name = prefix_name(name)
if defined?(name) do
new_wait = Keyword.get(options, :wait)
- {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name)
- new = {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, backend, new_wait || wait}
+ new_max_retries = Keyword.get(options, :max_retries)
+ {__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name)
+ new = {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, ref, new_wait || wait, new_max_retries || max_retries}
:persistent_term.put(name, new)
:ok
else
:error
end
end
- @spec limit(atom(), function()) :: {:error, :overload} | any()
+ @spec limit(atom(), function(), opts) :: {:error, :overload} | any() when opts: [option],
+ option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
@doc "Limits invocation of `fun`."
- def limit(name, fun) do
- do_limit(prefix_name(name), fun)
+ def limit(name, fun, opts \\ []) do
+ do_limit(prefix_name(name), fun, opts, 0)
end
- defp do_limit(name, fun) do
- {__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name)
+ defp do_limit(name, fun, opts, retries) do
+ {__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name)
max = max_running + max_waiting
- counter = inc(backend, name)
+ counter = inc(ref, name)
+ max_retries = Keyword.get(opts, :max_retries) || max_retries
cond do
counter <= max_running ->
try do
fun.()
after
- dec(backend, name)
+ dec(ref, name)
end
counter > max ->
- dec(backend, name)
+ dec(ref, name)
+ {:error, :overload}
+
+ retries + 1 > max_retries ->
{:error, :overload}
counter > max_running ->
- wait(backend, name, wait, fun)
+ wait(ref, name, wait, fun, opts, max_retries, retries + 1)
end
end
- defp wait(backend, name, wait, fun) do
+ defp wait(ref, name, wait, fun, opts, max_retries, retries) do
+ wait = Keyword.get(opts, :timeout) || wait
Process.sleep(wait)
- dec(backend, name)
- do_limit(name, fun)
+ dec(ref, name)
+ do_limit(name, fun, opts, retries)
end
- defp inc({:ets, ets}, name) do
- :ets.update_counter(ets, name, {2, 1}, {name, 0})
- end
-
- defp inc({:atomics, ref}, _) do
+ defp inc(ref, _) do
:atomics.add_get(ref, 1, 1)
end
- defp dec({:ets, ets}, name) do
- :ets.update_counter(ets, name, {2, -1}, {name, 0})
- end
-
- defp dec({:atomics, ref}, _) do
+ defp dec(ref, _) do
:atomics.sub_get(ref, 1, 1)
end
defp prefix_name(suffix), do: Module.concat(__MODULE__, suffix)
defp defined?(name) do
{__MODULE__, _, _, _, _, _} = :persistent_term.get(name)
true
rescue
_ -> false
end
- defp default_backend() do
- if Code.ensure_loaded?(:atomics) do
- :atomics
- else
- Logger.debug("ConcurrentLimiter: atomics not available, using ETS backend")
- :ets
- end
- end
-
- defp setup_backend(:ets) do
- setup_backend({:ets, ETS})
- end
-
- defp setup_backend({:ets, name}) do
- setup_backend({:ets, name, [{:write_concurrency, true}, {:read_concurrency, true}]})
- end
-
- defp setup_backend({:ets, name, options}) do
- ets_name = prefix_name(name)
-
- case :ets.whereis(ets_name) do
- :undefined -> :ets.new(ets_name, [:public, :named_table] ++ options)
- _ -> nil
- end
- {:ok, {:ets, ets_name}}
- end
-
- defp setup_backend(:atomics) do
- {:ok, {:atomics, :atomics.new(1, [signed: true])}}
- end
-
end

File Metadata

Mime Type
text/x-diff
Expires
Tue, Jan 21, 12:50 PM (1 d, 5 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
55716
Default Alt Text
(6 KB)

Event Timeline