Page MenuHomePhorge

No OneTemporary

Size
9 KB
Referenced Files
None
Subscribers
None
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 792ecbf..b399b8e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,9 +1,19 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
+## [Unreleased]
+
+### Added
+
+- Telemetry events.
+
+### Fixed
+
+- Decrement counter when max retries has been reached.
+
## [0.1.0] - 2020-05-16
Initial release.
diff --git a/lib/concurrent_limiter.ex b/lib/concurrent_limiter.ex
index a581447..e7b6b55 100644
--- a/lib/concurrent_limiter.ex
+++ b/lib/concurrent_limiter.ex
@@ -1,144 +1,150 @@
# ConcurrentLimiter: A concurrency limiter.
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: LGPL-3.0-only
defmodule ConcurrentLimiter do
require Logger
@moduledoc """
A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes.
It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead.
As it internally uses `persistent_term` to store metadata, it is not made for a large number of different or dynamic limiters and
cannot be used for things like a per-user rate limiter.
```elixir
:ok = ConcurrentLimiter.new(RequestLimiter, 10, 10)
ConcurrentLimiter.limit(RequestLimiter, fn() -> something_that_can_only_run_ten_times_concurrently() end)
```
"""
@default_wait 150
@default_max_retries 5
@doc "Initializes a `ConcurrentLimiter`."
@spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing}
when name: atom(),
max_running: non_neg_integer(),
max_waiting: non_neg_integer() | :infinity,
options: [option],
option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
def new(name, max_running, max_waiting, options \\ []) do
name = prefix_name(name)
if defined?(name) do
{:error, :existing}
else
wait = Keyword.get(options, :wait, @default_wait)
max_retries = Keyword.get(options, :max_retries, @default_max_retries)
atomics = :atomics.new(1, signed: true)
:persistent_term.put(
name,
{__MODULE__, max_running, max_waiting, atomics, wait, max_retries}
)
:ok
end
end
@doc "Adjust the limits at runtime."
@spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error
when name: atom(),
new_max_running: non_neg_integer(),
new_max_waiting: non_neg_integer() | :infinity,
options: [option],
option: {:wait, non_neg_integer()}
def set(name, new_max_running, new_max_waiting, options \\ []) do
name = prefix_name(name)
if defined?(name) do
new_wait = Keyword.get(options, :wait)
new_max_retries = Keyword.get(options, :max_retries)
{__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name)
new =
{__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, ref,
new_wait || wait, new_max_retries || max_retries}
:persistent_term.put(name, new)
:ok
else
:error
end
end
@spec delete(name) :: :ok when name: atom()
@doc "Deletes a limiter."
def delete(name) do
if defined?(name) do
:persistent_term.put(name, nil)
end
:ok
end
@doc "Limits invocation of `fun`."
@spec limit(name, function(), opts) :: {:error, :overload} | any()
when name: atom(),
opts: [option],
option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
def limit(name, fun, opts \\ []) do
do_limit(prefix_name(name), fun, opts, 0)
end
defp do_limit(name, fun, opts, retries) do
{__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name)
max = max_running + max_waiting
counter = inc(ref, name)
max_retries = Keyword.get(opts, :max_retries) || max_retries
+ :telemetry.execute([:concurrent_limiter, :limit], %{counter: counter}, %{limiter: name})
cond do
counter <= max_running ->
+ :telemetry.execute([:concurrent_limiter, :execution], %{counter: counter}, %{limiter: name})
try do
fun.()
after
dec(ref, name)
end
counter > max ->
+ :telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"})
dec(ref, name)
{:error, :overload}
retries + 1 > max_retries ->
+ :telemetry.execute([:concurrent_limiter, :max_retries], %{counter: counter}, %{limiter: name, retries: retries + 1})
+ dec(ref, name)
{:error, :overload}
counter > max_running ->
+ :telemetry.execute([:concurrent_limiter, :wait], %{counter: counter}, %{limiter: name, retries: retries + 1})
wait(ref, name, fun, wait, opts, retries + 1)
end
end
defp wait(ref, name, fun, wait, opts, retries) do
wait = Keyword.get(opts, :timeout) || wait
Process.sleep(wait)
dec(ref, name)
do_limit(name, fun, opts, retries)
end
defp inc(ref, _) do
:atomics.add_get(ref, 1, 1)
end
defp dec(ref, _) do
:atomics.sub_get(ref, 1, 1)
end
defp prefix_name(suffix), do: Module.concat(__MODULE__, suffix)
defp defined?(name) do
{__MODULE__, _, _, _, _, _} = :persistent_term.get(name)
true
rescue
_ -> false
end
end
diff --git a/mix.exs b/mix.exs
index ef7bb2c..237b2b9 100644
--- a/mix.exs
+++ b/mix.exs
@@ -1,46 +1,47 @@
defmodule ConcurrentLimiter.MixProject do
use Mix.Project
@repo "https://git.pleroma.social/pleroma/elixir-libraries/concurrent_limiter"
def project do
[
app: :concurrent_limiter,
description: "A concurrency limiter",
version: "0.1.0",
elixir: "~> 1.9",
start_permanent: Mix.env() == :prod,
deps: deps(),
package: package(),
# Docs
name: "Concurrent Limiter",
source_url: @repo,
homepage_url: @repo,
docs: [
main: "ConcurrentLimiter",
extras: [],
source_url_pattern: @repo <> "/blob/master/%{path}#L%{line}"
]
]
end
def application do
[
extra_applications: [:logger]
]
end
defp deps do
[
+ {:telemetry, "~> 0.3"},
{:credo, "~> 1.1.0", only: [:dev, :test], runtime: false},
{:ex_doc, "~> 0.21", only: :dev, runtime: false},
{:benchee, "~> 1.0", only: [:dev, :test]}
]
end
defp package do
[
licenses: ["LGPLv3"],
links: %{"GitLab" => @repo}
]
end
end
diff --git a/mix.lock b/mix.lock
index 98a2640..8acc217 100644
--- a/mix.lock
+++ b/mix.lock
@@ -1,12 +1,13 @@
%{
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"credo": {:hex, :credo, "1.1.5", "caec7a3cadd2e58609d7ee25b3931b129e739e070539ad1a0cd7efeeb47014f4", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d0bbd3222607ccaaac5c0340f7f525c627ae4d7aee6c8c8c108922620c5b6446"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"},
"ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"},
"jason": {:hex, :jason, "1.2.1", "12b22825e22f468c02eb3e4b9985f3d0cb8dc40b9bd704730efa11abd2708c44", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "b659b8571deedf60f79c5a608e15414085fa141344e2716fbd6988a084b5f993"},
"makeup": {:hex, :makeup, "1.0.1", "82f332e461dc6c79dbd82fbe2a9c10d48ed07146f0a478286e590c83c52010b5", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49736fe5b66a08d8575bf5321d716bac5da20c8e6b97714fec2bcd6febcfa1f8"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
+ "telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Jan 20, 10:26 PM (1 d, 17 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
55602
Default Alt Text
(9 KB)

Event Timeline