Page MenuHomePhorge

No OneTemporary

Size
7 KB
Referenced Files
None
Subscribers
None
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b399b8e..daa5288 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,19 +1,20 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased]
### Added
- Telemetry events.
### Fixed
- Decrement counter when max retries has been reached.
+- Fixes behaviour of `max_waiting = 0` with `max_size = 1`.
## [0.1.0] - 2020-05-16
Initial release.
diff --git a/lib/concurrent_limiter.ex b/lib/concurrent_limiter.ex
index 4a367ea..f871276 100644
--- a/lib/concurrent_limiter.ex
+++ b/lib/concurrent_limiter.ex
@@ -1,175 +1,182 @@
# ConcurrentLimiter: A concurrency limiter.
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: LGPL-3.0-only
defmodule ConcurrentLimiter do
require Logger
@moduledoc """
A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes.
It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead.
As it internally uses `persistent_term` to store metadata, it is not made for a large number of different or dynamic limiters and
cannot be used for things like a per-user rate limiter.
```elixir
:ok = ConcurrentLimiter.new(RequestLimiter, 10, 10)
ConcurrentLimiter.limit(RequestLimiter, fn() -> something_that_can_only_run_ten_times_concurrently() end)
```
"""
@default_wait 150
@default_max_retries 5
@doc "Initializes a `ConcurrentLimiter`."
@spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing}
when name: atom(),
max_running: non_neg_integer(),
max_waiting: non_neg_integer() | :infinity,
options: [option],
option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
def new(name, max_running, max_waiting, options \\ []) do
name = prefix_name(name)
if defined?(name) do
{:error, :existing}
else
wait = Keyword.get(options, :wait, @default_wait)
max_retries = Keyword.get(options, :max_retries, @default_max_retries)
atomics = :atomics.new(1, signed: true)
:persistent_term.put(
name,
{__MODULE__, max_running, max_waiting, atomics, wait, max_retries}
)
:ok
end
end
@doc "Adjust the limits at runtime."
@spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error
when name: atom(),
new_max_running: non_neg_integer(),
new_max_waiting: non_neg_integer() | :infinity,
options: [option],
option: {:wait, non_neg_integer()}
def set(name, new_max_running, new_max_waiting, options \\ []) do
name = prefix_name(name)
if defined?(name) do
new_wait = Keyword.get(options, :wait)
new_max_retries = Keyword.get(options, :max_retries)
{__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name)
new =
{__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, ref,
new_wait || wait, new_max_retries || max_retries}
:persistent_term.put(name, new)
:ok
else
:error
end
end
@spec delete(name) :: :ok when name: atom()
@doc "Deletes a limiter."
def delete(name) do
if defined?(name) do
:persistent_term.put(name, nil)
end
:ok
end
@doc "Limits invocation of `fun`."
@spec limit(name, function(), opts) :: {:error, :overload} | any()
when name: atom(),
opts: [option],
option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
def limit(name, fun, opts \\ []) do
do_limit(prefix_name(name), fun, opts, 0)
end
defp do_limit(name, fun, opts, retries) do
{__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name)
max = max_running + max_waiting
counter = inc(ref, name)
max_retries = Keyword.get(opts, :max_retries) || max_retries
:telemetry.execute([:concurrent_limiter, :limit], %{counter: counter}, %{limiter: name})
cond do
counter <= max_running ->
:telemetry.execute([:concurrent_limiter, :execution], %{counter: counter}, %{
limiter: name
})
Process.flag(:trap_exit, true)
try do
fun.()
after
dec(ref, name)
Process.flag(:trap_exit, false)
receive do
{:EXIT, _, reason} ->
Process.exit(self(), reason)
after
0 -> :noop
end
end
counter > max ->
:telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{
limiter: name,
scope: "max"
})
+ max_waiting == 0 ->
+ :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"})
+ dec(ref, name)
+ {:error, :overload}
+
+ counter > max ->
+ :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"})
dec(ref, name)
{:error, :overload}
retries + 1 > max_retries ->
:telemetry.execute([:concurrent_limiter, :max_retries], %{counter: counter}, %{
limiter: name,
retries: retries + 1
})
dec(ref, name)
{:error, :overload}
counter > max_running ->
:telemetry.execute([:concurrent_limiter, :wait], %{counter: counter}, %{
limiter: name,
retries: retries + 1
})
wait(ref, name, fun, wait, opts, retries + 1)
end
end
defp wait(ref, name, fun, wait, opts, retries) do
wait = Keyword.get(opts, :timeout) || wait
Process.sleep(wait)
dec(ref, name)
do_limit(name, fun, opts, retries)
end
defp inc(ref, _) do
:atomics.add_get(ref, 1, 1)
end
defp dec(ref, _) do
:atomics.sub_get(ref, 1, 1)
end
defp prefix_name(suffix), do: Module.concat(__MODULE__, suffix)
defp defined?(name) do
{__MODULE__, _, _, _, _, _} = :persistent_term.get(name)
true
rescue
_ -> false
end
end
diff --git a/test/concurrent_limiter_test.exs b/test/concurrent_limiter_test.exs
index 83e9981..74c9590 100644
--- a/test/concurrent_limiter_test.exs
+++ b/test/concurrent_limiter_test.exs
@@ -1,39 +1,50 @@
# ConcurrentLimiter: A concurrency limiter.
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: LGPL-3.0-only
defmodule ConcurrentLimiterTest do
use ExUnit.Case
doctest ConcurrentLimiter
+ test "limited to one" do
+ name = "l1"
+ ConcurrentLimiter.new(name, 1, 0, max_retries: 0)
+ endless = fn() -> :timer.sleep(10000) end
+ spawn(fn() -> ConcurrentLimiter.limit(name, endless) end)
+ :timer.sleep(5)
+ {:error, :overload} = ConcurrentLimiter.limit(name, endless)
+ {:error, :overload} = ConcurrentLimiter.limit(name, endless)
+ {:error, :overload} = ConcurrentLimiter.limit(name, endless)
+ end
+
test "limiter is atomic" do
name = "test"
ConcurrentLimiter.new(name, 2, 2)
self = self()
spawn_link(fn -> sleepy(self, name, 500) end)
spawn_link(fn -> sleepy(self, name, 500) end)
spawn_link(fn -> sleepy(self, name, 500) end)
spawn_link(fn -> sleepy(self, name, 500) end)
spawn_link(fn -> sleepy(self, name, 500) end)
assert_receive :ok, 2000
assert_receive :ok, 2000
assert_receive {:error, :overload}, 2000
assert_receive :ok, 2000
assert_receive :ok, 2000
end
defp sleepy(parent, name, duration) do
result =
ConcurrentLimiter.limit(name, fn ->
send(parent, :ok)
Process.sleep(duration)
:ok
end)
case result do
:ok -> :ok
other -> send(parent, other)
end
end
end

File Metadata

Mime Type
text/x-diff
Expires
Mon, Jan 20, 4:28 PM (1 d, 12 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
55586
Default Alt Text
(7 KB)

Event Timeline