Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F1037505
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Size
10 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index 5cab62c85..306598157 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -1,224 +1,225 @@
defmodule Pleroma.Web.Streamer do
use GenServer
require Logger
alias Pleroma.{User, Notification, Activity, Object, Repo}
alias Pleroma.Web.ActivityPub.ActivityPub
def init(args) do
{:ok, args}
end
def start_link do
spawn(fn ->
# 30 seconds
Process.sleep(1000 * 30)
GenServer.cast(__MODULE__, %{action: :ping})
end)
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def add_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
end
def remove_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
end
def stream(topic, item) do
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
end
def handle_cast(%{action: :ping}, topics) do
Map.values(topics)
|> List.flatten()
|> Enum.each(fn socket ->
Logger.debug("Sending keepalive ping")
send(socket.transport_pid, {:text, ""})
end)
spawn(fn ->
# 30 seconds
Process.sleep(1000 * 30)
GenServer.cast(__MODULE__, %{action: :ping})
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
Enum.each(recipient_topics || [], fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(topics, user_topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
author = User.get_cached_by_ap_id(item.data["actor"])
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
case ActivityPub.is_public?(item) do
true ->
Pleroma.List.get_lists_from_activity(item)
_ ->
Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list ->
owner = Repo.get(User, list.user_id)
- author.follower_address in owner.following
+
+ ActivityPub.visible_for_user?(item, owner)
end)
end
recipient_topics =
recipient_lists
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
Enum.each(recipient_topics || [], fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(topics, list_topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
topic = "user:#{item.user_id}"
Enum.each(topics[topic] || [], fn socket ->
json =
%{
event: "notification",
payload:
Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
socket.assigns["user"],
item
)
|> Jason.encode!()
}
|> Jason.encode!()
send(socket.transport_pid, {:text, json})
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
Logger.debug("Trying to push to users")
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
push_to_socket(topics, topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(topics, topic, item)
{:noreply, topics}
end
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = List.delete(sockets_for_topic, socket)
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Removed conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(m, state) do
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state}
end
defp represent_update(%Activity{} = activity, %User{} = user) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity,
for: user
)
|> Jason.encode!()
}
|> Jason.encode!()
end
defp represent_update(%Activity{} = activity) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity
)
|> Jason.encode!()
}
|> Jason.encode!()
end
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info["blocks"] || []
parent = Object.normalize(item.data["object"])
unless is_nil(parent) or item.actor in blocks or parent.data["actor"] in blocks do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end
def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info["blocks"] || []
unless item.actor in blocks do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end
defp internal_topic(topic, socket) when topic in ~w[user direct] do
"#{topic}:#{socket.assigns[:user].id}"
end
defp internal_topic(topic, _), do: topic
end
diff --git a/test/web/streamer_test.exs b/test/web/streamer_test.exs
index 47d491d1b..df58441f0 100644
--- a/test/web/streamer_test.exs
+++ b/test/web/streamer_test.exs
@@ -1,63 +1,170 @@
defmodule Pleroma.Web.StreamerTest do
use Pleroma.DataCase
alias Pleroma.Web.Streamer
- alias Pleroma.User
+ alias Pleroma.{List, User}
alias Pleroma.Web.CommonAPI
import Pleroma.Factory
test "it sends to public" do
user = insert(:user)
other_user = insert(:user)
task =
Task.async(fn ->
assert_receive {:text, _}, 4_000
end)
fake_socket = %{
transport_pid: task.pid,
assigns: %{
user: user
}
}
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
topics = %{
"public" => [fake_socket]
}
Streamer.push_to_socket(topics, "public", activity)
Task.await(task)
end
test "it doesn't send to blocked users" do
user = insert(:user)
blocked_user = insert(:user)
{:ok, user} = User.block(user, blocked_user)
task =
Task.async(fn ->
refute_receive {:text, _}, 1_000
end)
fake_socket = %{
transport_pid: task.pid,
assigns: %{
user: user
}
}
{:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
topics = %{
"public" => [fake_socket]
}
Streamer.push_to_socket(topics, "public", activity)
Task.await(task)
end
+
+ test "it doesn't send unwanted DMs to list" do
+ user_a = insert(:user)
+ user_b = insert(:user)
+ user_c = insert(:user)
+
+ {:ok, user_a} = User.follow(user_a, user_b)
+
+ {:ok, list} = List.create("Test", user_a)
+ {:ok, list} = List.follow(list, user_b)
+
+ task =
+ Task.async(fn ->
+ refute_receive {:text, _}, 1_000
+ end)
+
+ fake_socket = %{
+ transport_pid: task.pid,
+ assigns: %{
+ user: user_a
+ }
+ }
+
+ {:ok, activity} =
+ CommonAPI.post(user_b, %{
+ "status" => "@#{user_c.nickname} Test",
+ "visibility" => "direct"
+ })
+
+ topics = %{
+ "list:#{list.id}" => [fake_socket]
+ }
+
+ Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
+
+ Task.await(task)
+ end
+
+ test "it doesn't send unwanted private posts to list" do
+ user_a = insert(:user)
+ user_b = insert(:user)
+
+ {:ok, list} = List.create("Test", user_a)
+ {:ok, list} = List.follow(list, user_b)
+
+ task =
+ Task.async(fn ->
+ refute_receive {:text, _}, 1_000
+ end)
+
+ fake_socket = %{
+ transport_pid: task.pid,
+ assigns: %{
+ user: user_a
+ }
+ }
+
+ {:ok, activity} =
+ CommonAPI.post(user_b, %{
+ "status" => "Test",
+ "visibility" => "private"
+ })
+
+ topics = %{
+ "list:#{list.id}" => [fake_socket]
+ }
+
+ Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
+
+ Task.await(task)
+ end
+
+ test "it send wanted private posts to list" do
+ user_a = insert(:user)
+ user_b = insert(:user)
+
+ {:ok, user_a} = User.follow(user_a, user_b)
+
+ {:ok, list} = List.create("Test", user_a)
+ {:ok, list} = List.follow(list, user_b)
+
+ task =
+ Task.async(fn ->
+ assert_receive {:text, _}, 1_000
+ end)
+
+ fake_socket = %{
+ transport_pid: task.pid,
+ assigns: %{
+ user: user_a
+ }
+ }
+
+ {:ok, activity} =
+ CommonAPI.post(user_b, %{
+ "status" => "Test",
+ "visibility" => "private"
+ })
+
+ topics = %{
+ "list:#{list.id}" => [fake_socket]
+ }
+
+ Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
+
+ Task.await(task)
+ end
end
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, May 14, 7:23 AM (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
158383
Default Alt Text
(10 KB)
Attached To
Mode
rPUBE pleroma-upstream
Attached
Detach File
Event Timeline
Log In to Comment