Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F1037462
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Size
17 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index f7f6d047e..4ce2e6052 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -1,556 +1,558 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.{Activity, Repo, Object, Upload, User, Notification}
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.WebFinger
alias Pleroma.Web.Federator
alias Pleroma.Web.OStatus
import Ecto.Query
import Pleroma.Web.ActivityPub.Utils
require Logger
@httpoison Application.get_env(:pleroma, :httpoison)
@instance Application.get_env(:pleroma, :instance)
@rewrite_policy Keyword.get(@instance, :rewrite_policy)
def get_recipients(data) do
(data["to"] || []) ++ (data["cc"] || [])
end
def insert(map, local \\ true) when is_map(map) do
with nil <- Activity.get_by_ap_id(map["id"]),
map <- lazy_put_activity_defaults(map),
{:ok, map} <- @rewrite_policy.filter(map),
:ok <- insert_full_object(map) do
{:ok, activity} =
Repo.insert(%Activity{
data: map,
local: local,
actor: map["actor"],
recipients: get_recipients(map)
})
Notification.create_notifications(activity)
stream_out(activity)
{:ok, activity}
else
%Activity{} = activity -> {:ok, activity}
error -> {:error, error}
end
end
def stream_out(activity) do
public = "https://www.w3.org/ns/activitystreams#Public"
if activity.data["type"] in ["Create", "Announce"] do
Pleroma.Web.Streamer.stream("user", activity)
if Enum.member?(activity.data["to"], public) do
Pleroma.Web.Streamer.stream("public", activity)
if activity.local do
Pleroma.Web.Streamer.stream("public:local", activity)
end
else
if !Enum.member?(activity.data["cc"] || [], public) &&
!Enum.member?(
activity.data["to"],
User.get_by_ap_id(activity.data["actor"]).follower_address
),
do: Pleroma.Web.Streamer.stream("direct", activity)
end
end
end
def create(%{to: to, actor: actor, context: context, object: object} = params) do
additional = params[:additional] || %{}
# only accept false as false value
local = !(params[:local] == false)
published = params[:published]
with create_data <-
make_create_data(
%{to: to, actor: actor, published: published, context: context, object: object},
additional
),
{:ok, activity} <- insert(create_data, local),
:ok <- maybe_federate(activity),
{:ok, _actor} <- User.increase_note_count(actor) do
{:ok, activity}
end
end
def accept(%{to: to, actor: actor, object: object} = params) do
# only accept false as false value
local = !(params[:local] == false)
with data <- %{"to" => to, "type" => "Accept", "actor" => actor, "object" => object},
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
# only accept false as false value
local = !(params[:local] == false)
with data <- %{
"to" => to,
"cc" => cc,
"type" => "Update",
"actor" => actor,
"object" => object
},
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
# TODO: This is weird, maybe we shouldn't check here if we can make the activity.
def like(
%User{ap_id: ap_id} = user,
%Object{data: %{"id" => _}} = object,
activity_id \\ nil,
local \\ true
) do
with nil <- get_existing_like(ap_id, object),
like_data <- make_like_data(user, object, activity_id),
{:ok, activity} <- insert(like_data, local),
{:ok, object} <- add_like_to_object(activity, object),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
%Activity{} = activity -> {:ok, activity, object}
error -> {:error, error}
end
end
def unlike(%User{} = actor, %Object{} = object) do
with %Activity{} = activity <- get_existing_like(actor.ap_id, object),
{:ok, _activity} <- Repo.delete(activity),
{:ok, object} <- remove_like_from_object(activity, object) do
{:ok, object}
else
_e -> {:ok, object}
end
end
def announce(
%User{ap_id: _} = user,
%Object{data: %{"id" => _}} = object,
activity_id \\ nil,
local \\ true
) do
with true <- is_public?(object),
announce_data <- make_announce_data(user, object, activity_id),
{:ok, activity} <- insert(announce_data, local),
{:ok, object} <- add_announce_to_object(activity, object),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
error -> {:error, error}
end
end
def follow(follower, followed, activity_id \\ nil, local \\ true) do
with data <- make_follow_data(follower, followed, activity_id),
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def unfollow(follower, followed, local \\ true) do
with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
unfollow_data <- make_unfollow_data(follower, followed, follow_activity),
{:ok, activity} <- insert(unfollow_data, local),
:ok,
maybe_federate(activity) do
{:ok, activity}
end
end
def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
user = User.get_cached_by_ap_id(actor)
data = %{
"type" => "Delete",
"actor" => actor,
"object" => id,
"to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
}
with Repo.delete(object),
Repo.delete_all(Activity.all_non_create_by_object_ap_id_q(id)),
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity),
{:ok, _actor} <- User.decrease_note_count(user) do
{:ok, activity}
end
end
def fetch_activities_for_context(context, opts \\ %{}) do
public = ["https://www.w3.org/ns/activitystreams#Public"]
recipients =
if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
query = from(activity in Activity)
query =
query
|> restrict_blocked(opts)
|> restrict_recipients(recipients, opts["user"])
query =
from(
activity in query,
where:
fragment(
"?->>'type' = ? and ?->>'context' = ?",
activity.data,
"Create",
activity.data,
^context
),
order_by: [desc: :id]
)
Repo.all(query)
end
# TODO: Make this work properly with unlisted.
def fetch_public_activities(opts \\ %{}) do
q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
q
|> Repo.all()
|> Enum.reverse()
end
@valid_visibilities ~w[direct unlisted public private]
defp restrict_visibility(query, %{visibility: "direct"}) do
public = "https://www.w3.org/ns/activitystreams#Public"
from(
activity in query,
join: sender in User,
on: sender.ap_id == activity.actor,
+ # Are non-direct statuses with no to/cc possible?
where:
- fragment("not data->'to' \\? ?", ^public) and fragment("not data->'cc' \\? ?", ^public) and
- fragment("not data->'to' \\? ?", sender.follower_address)
+ fragment("not coalesce(data->'to' \\? ?, false)", ^public) and
+ fragment("not coalesce(data->'cc' \\? ?, false)", ^public) and
+ fragment("not coalesce(data->'to' \\? ?, false)", sender.follower_address)
)
end
defp restrict_visibility(_query, %{visibility: visibility})
when visibility not in @valid_visibilities do
Logger.error("Could not restrict visibility to #{visibility}")
end
defp restrict_visibility(query, _visibility), do: query
defp restrict_since(query, %{"since_id" => since_id}) do
from(activity in query, where: activity.id > ^since_id)
end
defp restrict_since(query, _), do: query
defp restrict_tag(query, %{"tag" => tag}) do
from(
activity in query,
where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
)
end
defp restrict_tag(query, _), do: query
defp restrict_recipients(query, [], _user), do: query
defp restrict_recipients(query, recipients, nil) do
from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
end
defp restrict_recipients(query, recipients, user) do
from(
activity in query,
where: fragment("? && ?", ^recipients, activity.recipients),
or_where: activity.actor == ^user.ap_id
)
end
defp restrict_limit(query, %{"limit" => limit}) do
from(activity in query, limit: ^limit)
end
defp restrict_limit(query, _), do: query
defp restrict_local(query, %{"local_only" => true}) do
from(activity in query, where: activity.local == true)
end
defp restrict_local(query, _), do: query
defp restrict_max(query, %{"max_id" => max_id}) do
from(activity in query, where: activity.id < ^max_id)
end
defp restrict_max(query, _), do: query
defp restrict_actor(query, %{"actor_id" => actor_id}) do
from(activity in query, where: activity.actor == ^actor_id)
end
defp restrict_actor(query, _), do: query
defp restrict_type(query, %{"type" => type}) when is_binary(type) do
restrict_type(query, %{"type" => [type]})
end
defp restrict_type(query, %{"type" => type}) do
from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
end
defp restrict_type(query, _), do: query
defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
from(
activity in query,
where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
)
end
defp restrict_favorited_by(query, _), do: query
defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
from(
activity in query,
where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
)
end
defp restrict_media(query, _), do: query
# Only search through last 100_000 activities by default
defp restrict_recent(query, %{"whole_db" => true}), do: query
defp restrict_recent(query, _) do
since = (Repo.aggregate(Activity, :max, :id) || 0) - 100_000
from(activity in query, where: activity.id > ^since)
end
defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
blocks = info["blocks"] || []
from(
activity in query,
where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks)
)
end
defp restrict_blocked(query, _), do: query
def fetch_activities_query(recipients, opts \\ %{}) do
base_query =
from(
activity in Activity,
limit: 20,
order_by: [fragment("? desc nulls last", activity.id)]
)
base_query
|> restrict_recipients(recipients, opts["user"])
|> restrict_tag(opts)
|> restrict_since(opts)
|> restrict_local(opts)
|> restrict_limit(opts)
|> restrict_max(opts)
|> restrict_actor(opts)
|> restrict_type(opts)
|> restrict_favorited_by(opts)
|> restrict_recent(opts)
|> restrict_blocked(opts)
|> restrict_media(opts)
|> restrict_visibility(opts)
end
def fetch_activities(recipients, opts \\ %{}) do
fetch_activities_query(recipients, opts)
|> Repo.all()
|> Enum.reverse()
end
def upload(file) do
data = Upload.store(file)
Repo.insert(%Object{data: data})
end
def user_data_from_user_object(data) do
avatar =
data["icon"]["url"] &&
%{
"type" => "Image",
"url" => [%{"href" => data["icon"]["url"]}]
}
banner =
data["image"]["url"] &&
%{
"type" => "Image",
"url" => [%{"href" => data["image"]["url"]}]
}
user_data = %{
ap_id: data["id"],
info: %{
"ap_enabled" => true,
"source_data" => data,
"banner" => banner
},
avatar: avatar,
nickname: "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}",
name: data["name"],
follower_address: data["followers"],
bio: data["summary"]
}
{:ok, user_data}
end
def fetch_and_prepare_user_from_ap_id(ap_id) do
with {:ok, %{status_code: 200, body: body}} <-
@httpoison.get(ap_id, Accept: "application/activity+json"),
{:ok, data} <- Jason.decode(body) do
user_data_from_user_object(data)
else
e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
end
end
def make_user_from_ap_id(ap_id) do
if _user = User.get_by_ap_id(ap_id) do
Transmogrifier.upgrade_user_from_ap_id(ap_id)
else
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
User.insert_or_update_user(data)
else
e -> {:error, e}
end
end
end
def make_user_from_nickname(nickname) do
with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
make_user_from_ap_id(ap_id)
else
_e -> {:error, "No AP id in WebFinger"}
end
end
def publish(actor, activity) do
followers =
if actor.follower_address in activity.recipients do
{:ok, followers} = User.get_followers(actor)
followers |> Enum.filter(&(!&1.local))
else
[]
end
remote_inboxes =
(Pleroma.Web.Salmon.remote_users(activity) ++ followers)
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
|> Enum.map(fn %{info: %{"source_data" => data}} ->
(data["endpoints"] && data["endpoints"]["sharedInbox"]) || data["inbox"]
end)
|> Enum.uniq()
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data)
Enum.each(remote_inboxes, fn inbox ->
Federator.enqueue(:publish_single_ap, %{
inbox: inbox,
json: json,
actor: actor,
id: activity.data["id"]
})
end)
end
def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
Logger.info("Federating #{id} to #{inbox}")
host = URI.parse(inbox).host
signature =
Pleroma.Web.HTTPSignatures.sign(actor, %{host: host, "content-length": byte_size(json)})
@httpoison.post(
inbox,
json,
[{"Content-Type", "application/activity+json"}, {"signature", signature}],
hackney: [pool: :default]
)
end
# TODO:
# This will create a Create activity, which we need internally at the moment.
def fetch_object_from_id(id) do
if object = Object.get_cached_by_ap_id(id) do
{:ok, object}
else
Logger.info("Fetching #{id} via AP")
with true <- String.starts_with?(id, "http"),
{:ok, %{body: body, status_code: code}} when code in 200..299 <-
@httpoison.get(
id,
[Accept: "application/activity+json"],
follow_redirect: true,
timeout: 10000,
recv_timeout: 20000
),
{:ok, data} <- Jason.decode(body),
nil <- Object.get_by_ap_id(data["id"]),
params <- %{
"type" => "Create",
"to" => data["to"],
"cc" => data["cc"],
"actor" => data["attributedTo"],
"object" => data
},
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
{:ok, Object.get_by_ap_id(activity.data["object"]["id"])}
else
object = %Object{} ->
{:ok, object}
_e ->
Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
case OStatus.fetch_activity_from_url(id) do
{:ok, [activity | _]} -> {:ok, Object.get_by_ap_id(activity.data["object"]["id"])}
e -> e
end
end
end
end
def is_public?(activity) do
"https://www.w3.org/ns/activitystreams#Public" in (activity.data["to"] ++
(activity.data["cc"] || []))
end
def visible_for_user?(activity, nil) do
is_public?(activity)
end
def visible_for_user?(activity, user) do
x = [user.ap_id | user.following]
y = activity.data["to"] ++ (activity.data["cc"] || [])
visible_for_user?(activity, nil) || Enum.any?(x, &(&1 in y))
end
end
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, May 14, 7:19 AM (1 d, 11 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
166695
Default Alt Text
(17 KB)
Attached To
Mode
rPUBE pleroma-upstream
Attached
Detach File
Event Timeline
Log In to Comment