Page MenuHomePhorge

No OneTemporary

Size
112 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex
index dd6805125..bed96861f 100644
--- a/lib/pleroma/activity.ex
+++ b/lib/pleroma/activity.ex
@@ -1,81 +1,85 @@
defmodule Pleroma.Activity do
use Ecto.Schema
alias Pleroma.{Repo, Activity, Notification}
import Ecto.Query
schema "activities" do
field(:data, :map)
field(:local, :boolean, default: true)
field(:actor, :string)
field(:recipients, {:array, :string})
has_many(:notifications, Notification, on_delete: :delete_all)
timestamps()
end
def get_by_ap_id(ap_id) do
Repo.one(
from(
activity in Activity,
where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
)
)
end
# TODO:
# Go through these and fix them everywhere.
# Wrong name, only returns create activities
def all_by_object_ap_id_q(ap_id) do
from(
activity in Activity,
where:
fragment(
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
activity.data,
activity.data,
^to_string(ap_id)
),
where: fragment("(?)->>'type' = 'Create'", activity.data)
)
end
# Wrong name, returns all.
def all_non_create_by_object_ap_id_q(ap_id) do
from(
activity in Activity,
where:
fragment(
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
activity.data,
activity.data,
^to_string(ap_id)
)
)
end
# Wrong name plz fix thx
def all_by_object_ap_id(ap_id) do
Repo.all(all_by_object_ap_id_q(ap_id))
end
def create_activity_by_object_id_query(ap_ids) do
from(
activity in Activity,
where:
fragment(
"coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
activity.data,
activity.data,
^ap_ids
),
where: fragment("(?)->>'type' = 'Create'", activity.data)
)
end
def get_create_activity_by_object_ap_id(ap_id) when is_binary(ap_id) do
create_activity_by_object_id_query([ap_id])
|> Repo.one()
end
def get_create_activity_by_object_ap_id(_), do: nil
+
+ def normalize(obj) when is_map(obj), do: Activity.get_by_ap_id(obj["id"])
+ def normalize(ap_id) when is_binary(ap_id), do: Activity.get_by_ap_id(ap_id)
+ def normalize(_), do: nil
end
diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex
index ff2af4a6f..1bcff5a7b 100644
--- a/lib/pleroma/object.ex
+++ b/lib/pleroma/object.ex
@@ -1,51 +1,55 @@
defmodule Pleroma.Object do
use Ecto.Schema
alias Pleroma.{Repo, Object}
import Ecto.{Query, Changeset}
schema "objects" do
field(:data, :map)
timestamps()
end
def create(data) do
Object.change(%Object{}, %{data: data})
|> Repo.insert()
end
def change(struct, params \\ %{}) do
struct
|> cast(params, [:data])
|> validate_required([:data])
|> unique_constraint(:ap_id, name: :objects_unique_apid_index)
end
def get_by_ap_id(nil), do: nil
def get_by_ap_id(ap_id) do
Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
end
+ def normalize(obj) when is_map(obj), do: Object.get_by_ap_id(obj["id"])
+ def normalize(ap_id) when is_binary(ap_id), do: Object.get_by_ap_id(ap_id)
+ def normalize(_), do: nil
+
def get_cached_by_ap_id(ap_id) do
if Mix.env() == :test do
get_by_ap_id(ap_id)
else
key = "object:#{ap_id}"
Cachex.fetch!(:user_cache, key, fn _ ->
object = get_by_ap_id(ap_id)
if object do
{:commit, object}
else
{:ignore, object}
end
end)
end
end
def context_mapping(context) do
Object.change(%Object{}, %{data: %{"id" => context}})
end
end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 94f16c3c0..df22d29a8 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -1,688 +1,688 @@
defmodule Pleroma.User do
use Ecto.Schema
import Ecto.{Changeset, Query}
alias Pleroma.{Repo, User, Object, Web, Activity, Notification}
alias Comeonin.Pbkdf2
alias Pleroma.Web.{OStatus, Websub}
alias Pleroma.Web.ActivityPub.{Utils, ActivityPub}
schema "users" do
field(:bio, :string)
field(:email, :string)
field(:name, :string)
field(:nickname, :string)
field(:password_hash, :string)
field(:password, :string, virtual: true)
field(:password_confirmation, :string, virtual: true)
field(:following, {:array, :string}, default: [])
field(:ap_id, :string)
field(:avatar, :map)
field(:local, :boolean, default: true)
field(:info, :map, default: %{})
field(:follower_address, :string)
field(:search_distance, :float, virtual: true)
has_many(:notifications, Notification)
timestamps()
end
def avatar_url(user) do
case user.avatar do
%{"url" => [%{"href" => href} | _]} -> href
_ -> "#{Web.base_url()}/images/avi.png"
end
end
def banner_url(user) do
case user.info["banner"] do
%{"url" => [%{"href" => href} | _]} -> href
_ -> "#{Web.base_url()}/images/banner.png"
end
end
def ap_id(%User{nickname: nickname}) do
"#{Web.base_url()}/users/#{nickname}"
end
def ap_followers(%User{} = user) do
"#{ap_id(user)}/followers"
end
def follow_changeset(struct, params \\ %{}) do
struct
|> cast(params, [:following])
|> validate_required([:following])
end
def info_changeset(struct, params \\ %{}) do
struct
|> cast(params, [:info])
|> validate_required([:info])
end
def user_info(%User{} = user) do
oneself = if user.local, do: 1, else: 0
%{
following_count: length(user.following) - oneself,
note_count: user.info["note_count"] || 0,
follower_count: user.info["follower_count"] || 0,
locked: user.info["locked"] || false
}
end
@email_regex ~r/^[a-zA-Z0-9.!#$%&'*+\/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$/
def remote_user_creation(params) do
changes =
%User{}
|> cast(params, [:bio, :name, :ap_id, :nickname, :info, :avatar])
|> validate_required([:name, :ap_id, :nickname])
|> unique_constraint(:nickname)
|> validate_format(:nickname, @email_regex)
|> validate_length(:bio, max: 5000)
|> validate_length(:name, max: 100)
|> put_change(:local, false)
if changes.valid? do
case changes.changes[:info]["source_data"] do
%{"followers" => followers} ->
changes
|> put_change(:follower_address, followers)
_ ->
followers = User.ap_followers(%User{nickname: changes.changes[:nickname]})
changes
|> put_change(:follower_address, followers)
end
else
changes
end
end
def update_changeset(struct, params \\ %{}) do
struct
|> cast(params, [:bio, :name])
|> unique_constraint(:nickname)
|> validate_format(:nickname, ~r/^[a-zA-Z\d]+$/)
|> validate_length(:bio, max: 5000)
|> validate_length(:name, min: 1, max: 100)
end
def upgrade_changeset(struct, params \\ %{}) do
struct
|> cast(params, [:bio, :name, :info, :follower_address, :avatar])
|> unique_constraint(:nickname)
|> validate_format(:nickname, ~r/^[a-zA-Z\d]+$/)
|> validate_length(:bio, max: 5000)
|> validate_length(:name, max: 100)
end
def password_update_changeset(struct, params) do
changeset =
struct
|> cast(params, [:password, :password_confirmation])
|> validate_required([:password, :password_confirmation])
|> validate_confirmation(:password)
if changeset.valid? do
hashed = Pbkdf2.hashpwsalt(changeset.changes[:password])
changeset
|> put_change(:password_hash, hashed)
else
changeset
end
end
def reset_password(user, data) do
update_and_set_cache(password_update_changeset(user, data))
end
def register_changeset(struct, params \\ %{}) do
changeset =
struct
|> cast(params, [:bio, :email, :name, :nickname, :password, :password_confirmation])
|> validate_required([:email, :name, :nickname, :password, :password_confirmation])
|> validate_confirmation(:password)
|> unique_constraint(:email)
|> unique_constraint(:nickname)
|> validate_format(:nickname, ~r/^[a-zA-Z\d]+$/)
|> validate_format(:email, @email_regex)
|> validate_length(:bio, max: 1000)
|> validate_length(:name, min: 1, max: 100)
if changeset.valid? do
hashed = Pbkdf2.hashpwsalt(changeset.changes[:password])
ap_id = User.ap_id(%User{nickname: changeset.changes[:nickname]})
followers = User.ap_followers(%User{nickname: changeset.changes[:nickname]})
changeset
|> put_change(:password_hash, hashed)
|> put_change(:ap_id, ap_id)
|> put_change(:following, [followers])
|> put_change(:follower_address, followers)
else
changeset
end
end
def maybe_direct_follow(%User{} = follower, %User{info: info} = followed) do
user_config = Application.get_env(:pleroma, :user)
deny_follow_blocked = Keyword.get(user_config, :deny_follow_blocked)
user_info = user_info(followed)
should_direct_follow =
cond do
# if the account is locked, don't pre-create the relationship
user_info[:locked] == true ->
false
# if the users are blocking each other, we shouldn't even be here, but check for it anyway
deny_follow_blocked and
(User.blocks?(follower, followed) or User.blocks?(followed, follower)) ->
false
# if OStatus, then there is no three-way handshake to follow
User.ap_enabled?(followed) != true ->
true
# if there are no other reasons not to, just pre-create the relationship
true ->
true
end
if should_direct_follow do
follow(follower, followed)
else
{:ok, follower}
end
end
def maybe_follow(%User{} = follower, %User{info: info} = followed) do
if not following?(follower, followed) do
follow(follower, followed)
else
{:ok, follower}
end
end
def follow(%User{} = follower, %User{info: info} = followed) do
user_config = Application.get_env(:pleroma, :user)
deny_follow_blocked = Keyword.get(user_config, :deny_follow_blocked)
ap_followers = followed.follower_address
cond do
following?(follower, followed) or info["deactivated"] ->
{:error, "Could not follow user: #{followed.nickname} is already on your list."}
deny_follow_blocked and blocks?(followed, follower) ->
{:error, "Could not follow user: #{followed.nickname} blocked you."}
true ->
if !followed.local && follower.local && !ap_enabled?(followed) do
Websub.subscribe(follower, followed)
end
following =
[ap_followers | follower.following]
|> Enum.uniq()
follower =
follower
|> follow_changeset(%{following: following})
|> update_and_set_cache
{:ok, _} = update_follower_count(followed)
follower
end
end
def unfollow(%User{} = follower, %User{} = followed) do
ap_followers = followed.follower_address
if following?(follower, followed) and follower.ap_id != followed.ap_id do
following =
follower.following
|> List.delete(ap_followers)
{:ok, follower} =
follower
|> follow_changeset(%{following: following})
|> update_and_set_cache
{:ok, followed} = update_follower_count(followed)
{:ok, follower, Utils.fetch_latest_follow(follower, followed)}
else
{:error, "Not subscribed!"}
end
end
def following?(%User{} = follower, %User{} = followed) do
Enum.member?(follower.following, followed.follower_address)
end
def locked?(%User{} = user) do
user.info["locked"] || false
end
def get_by_ap_id(ap_id) do
Repo.get_by(User, ap_id: ap_id)
end
def update_and_set_cache(changeset) do
with {:ok, user} <- Repo.update(changeset) do
Cachex.put(:user_cache, "ap_id:#{user.ap_id}", user)
Cachex.put(:user_cache, "nickname:#{user.nickname}", user)
Cachex.put(:user_cache, "user_info:#{user.id}", user_info(user))
{:ok, user}
else
e -> e
end
end
def invalidate_cache(user) do
Cachex.del(:user_cache, "ap_id:#{user.ap_id}")
Cachex.del(:user_cache, "nickname:#{user.nickname}")
end
def get_cached_by_ap_id(ap_id) do
key = "ap_id:#{ap_id}"
Cachex.fetch!(:user_cache, key, fn _ -> get_by_ap_id(ap_id) end)
end
def get_cached_by_nickname(nickname) do
key = "nickname:#{nickname}"
Cachex.fetch!(:user_cache, key, fn _ -> get_or_fetch_by_nickname(nickname) end)
end
def get_by_nickname(nickname) do
Repo.get_by(User, nickname: nickname)
end
def get_by_nickname_or_email(nickname_or_email) do
case user = Repo.get_by(User, nickname: nickname_or_email) do
%User{} -> user
nil -> Repo.get_by(User, email: nickname_or_email)
end
end
def get_cached_user_info(user) do
key = "user_info:#{user.id}"
Cachex.fetch!(:user_cache, key, fn _ -> user_info(user) end)
end
def fetch_by_nickname(nickname) do
ap_try = ActivityPub.make_user_from_nickname(nickname)
case ap_try do
{:ok, user} -> {:ok, user}
_ -> OStatus.make_user(nickname)
end
end
def get_or_fetch_by_nickname(nickname) do
with %User{} = user <- get_by_nickname(nickname) do
user
else
_e ->
with [_nick, _domain] <- String.split(nickname, "@"),
{:ok, user} <- fetch_by_nickname(nickname) do
user
else
_e -> nil
end
end
end
def get_followers_query(%User{id: id, follower_address: follower_address}) do
from(
u in User,
where: fragment("? <@ ?", ^[follower_address], u.following),
where: u.id != ^id
)
end
def get_followers(user) do
q = get_followers_query(user)
{:ok, Repo.all(q)}
end
def get_friends_query(%User{id: id, following: following}) do
from(
u in User,
where: u.follower_address in ^following,
where: u.id != ^id
)
end
def get_friends(user) do
q = get_friends_query(user)
{:ok, Repo.all(q)}
end
def get_follow_requests_query(%User{} = user) do
from(
a in Activity,
where:
fragment(
"? ->> 'type' = 'Follow'",
a.data
),
where:
fragment(
"? ->> 'state' = 'pending'",
a.data
),
where:
fragment(
"? @> ?",
a.data,
^%{"object" => user.ap_id}
)
)
end
def get_follow_requests(%User{} = user) do
q = get_follow_requests_query(user)
reqs = Repo.all(q)
users =
Enum.map(reqs, fn req -> req.actor end)
|> Enum.uniq()
|> Enum.map(fn ap_id -> get_by_ap_id(ap_id) end)
{:ok, users}
end
def increase_note_count(%User{} = user) do
note_count = (user.info["note_count"] || 0) + 1
new_info = Map.put(user.info, "note_count", note_count)
cs = info_changeset(user, %{info: new_info})
update_and_set_cache(cs)
end
def decrease_note_count(%User{} = user) do
note_count = user.info["note_count"] || 0
note_count = if note_count <= 0, do: 0, else: note_count - 1
new_info = Map.put(user.info, "note_count", note_count)
cs = info_changeset(user, %{info: new_info})
update_and_set_cache(cs)
end
def update_note_count(%User{} = user) do
note_count_query =
from(
a in Object,
where: fragment("?->>'actor' = ? and ?->>'type' = 'Note'", a.data, ^user.ap_id, a.data),
select: count(a.id)
)
note_count = Repo.one(note_count_query)
new_info = Map.put(user.info, "note_count", note_count)
cs = info_changeset(user, %{info: new_info})
update_and_set_cache(cs)
end
def update_follower_count(%User{} = user) do
follower_count_query =
from(
u in User,
where: ^user.follower_address in u.following,
where: u.id != ^user.id,
select: count(u.id)
)
follower_count = Repo.one(follower_count_query)
new_info = Map.put(user.info, "follower_count", follower_count)
cs = info_changeset(user, %{info: new_info})
update_and_set_cache(cs)
end
def get_notified_from_activity(%Activity{recipients: to}) do
query =
from(
u in User,
where: u.ap_id in ^to,
where: u.local == true
)
Repo.all(query)
end
def get_recipients_from_activity(%Activity{recipients: to}) do
query =
from(
u in User,
where: u.ap_id in ^to,
or_where: fragment("? && ?", u.following, ^to)
)
query = from(u in query, where: u.local == true)
Repo.all(query)
end
def search(query, resolve) do
# strip the beginning @ off if there is a query
query = String.trim_leading(query, "@")
if resolve do
User.get_or_fetch_by_nickname(query)
end
inner =
from(
u in User,
select_merge: %{
search_distance:
fragment(
"? <-> (? || ?)",
^query,
u.nickname,
u.name
)
}
)
q =
from(
s in subquery(inner),
order_by: s.search_distance,
limit: 20
)
Repo.all(q)
end
def block(blocker, %User{ap_id: ap_id} = blocked) do
# sever any follow relationships to prevent leaks per activitypub (Pleroma issue #213)
blocker =
if following?(blocker, blocked) do
{:ok, blocker, _} = unfollow(blocker, blocked)
blocker
else
blocker
end
if following?(blocked, blocker) do
unfollow(blocked, blocker)
end
blocks = blocker.info["blocks"] || []
new_blocks = Enum.uniq([ap_id | blocks])
new_info = Map.put(blocker.info, "blocks", new_blocks)
cs = User.info_changeset(blocker, %{info: new_info})
update_and_set_cache(cs)
end
# helper to handle the block given only an actor's AP id
def block(blocker, %{ap_id: ap_id}) do
block(blocker, User.get_by_ap_id(ap_id))
end
def unblock(user, %{ap_id: ap_id}) do
blocks = user.info["blocks"] || []
new_blocks = List.delete(blocks, ap_id)
new_info = Map.put(user.info, "blocks", new_blocks)
cs = User.info_changeset(user, %{info: new_info})
update_and_set_cache(cs)
end
def blocks?(user, %{ap_id: ap_id}) do
blocks = user.info["blocks"] || []
domain_blocks = user.info["domain_blocks"] || []
%{host: host} = URI.parse(ap_id)
Enum.member?(blocks, ap_id) ||
Enum.any?(domain_blocks, fn domain ->
host == domain
end)
end
def block_domain(user, domain) do
domain_blocks = user.info["domain_blocks"] || []
new_blocks = Enum.uniq([domain | domain_blocks])
new_info = Map.put(user.info, "domain_blocks", new_blocks)
cs = User.info_changeset(user, %{info: new_info})
update_and_set_cache(cs)
end
def unblock_domain(user, domain) do
blocks = user.info["domain_blocks"] || []
new_blocks = List.delete(blocks, domain)
new_info = Map.put(user.info, "domain_blocks", new_blocks)
cs = User.info_changeset(user, %{info: new_info})
update_and_set_cache(cs)
end
def local_user_query() do
from(u in User, where: u.local == true)
end
def deactivate(%User{} = user) do
new_info = Map.put(user.info, "deactivated", true)
cs = User.info_changeset(user, %{info: new_info})
update_and_set_cache(cs)
end
def delete(%User{} = user) do
{:ok, user} = User.deactivate(user)
# Remove all relationships
{:ok, followers} = User.get_followers(user)
followers
|> Enum.each(fn follower -> User.unfollow(follower, user) end)
{:ok, friends} = User.get_friends(user)
friends
|> Enum.each(fn followed -> User.unfollow(user, followed) end)
query = from(a in Activity, where: a.actor == ^user.ap_id)
Repo.all(query)
|> Enum.each(fn activity ->
case activity.data["type"] do
"Create" ->
- ActivityPub.delete(Object.get_by_ap_id(activity.data["object"]["id"]))
+ ActivityPub.delete(Object.normalize(activity.data["object"]))
# TODO: Do something with likes, follows, repeats.
_ ->
"Doing nothing"
end
end)
:ok
end
def get_or_fetch_by_ap_id(ap_id) do
if user = get_by_ap_id(ap_id) do
user
else
ap_try = ActivityPub.make_user_from_ap_id(ap_id)
case ap_try do
{:ok, user} ->
user
_ ->
case OStatus.make_user(ap_id) do
{:ok, user} -> user
_ -> {:error, "Could not fetch by AP id"}
end
end
end
end
# AP style
def public_key_from_info(%{
"source_data" => %{"publicKey" => %{"publicKeyPem" => public_key_pem}}
}) do
key =
:public_key.pem_decode(public_key_pem)
|> hd()
|> :public_key.pem_entry_decode()
{:ok, key}
end
# OStatus Magic Key
def public_key_from_info(%{"magic_key" => magic_key}) do
{:ok, Pleroma.Web.Salmon.decode_key(magic_key)}
end
def get_public_key_for_ap_id(ap_id) do
with %User{} = user <- get_or_fetch_by_ap_id(ap_id),
{:ok, public_key} <- public_key_from_info(user.info) do
{:ok, public_key}
else
_ -> :error
end
end
defp blank?(""), do: nil
defp blank?(n), do: n
def insert_or_update_user(data) do
data =
data
|> Map.put(:name, blank?(data[:name]) || data[:nickname])
cs = User.remote_user_creation(data)
Repo.insert(cs, on_conflict: :replace_all, conflict_target: :nickname)
end
def ap_enabled?(%User{info: info}), do: info["ap_enabled"]
def ap_enabled?(_), do: false
def get_or_fetch(uri_or_nickname) do
if String.starts_with?(uri_or_nickname, "http") do
get_or_fetch_by_ap_id(uri_or_nickname)
else
get_or_fetch_by_nickname(uri_or_nickname)
end
end
end
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 195679fad..464832a1e 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -1,712 +1,712 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.{Activity, Repo, Object, Upload, User, Notification}
alias Pleroma.Web.ActivityPub.{Transmogrifier, MRF}
alias Pleroma.Web.WebFinger
alias Pleroma.Web.Federator
alias Pleroma.Web.OStatus
import Ecto.Query
import Pleroma.Web.ActivityPub.Utils
require Logger
@httpoison Application.get_env(:pleroma, :httpoison)
@instance Application.get_env(:pleroma, :instance)
def get_recipients(data) do
(data["to"] || []) ++ (data["cc"] || [])
end
defp check_actor_is_active(actor) do
if not is_nil(actor) do
with user <- User.get_cached_by_ap_id(actor),
nil <- user.info["deactivated"] do
:ok
else
_e -> :reject
end
else
:ok
end
end
def insert(map, local \\ true) when is_map(map) do
- with nil <- Activity.get_by_ap_id(map["id"]),
+ with nil <- Activity.normalize(map),
map <- lazy_put_activity_defaults(map),
:ok <- check_actor_is_active(map["actor"]),
{:ok, map} <- MRF.filter(map),
:ok <- insert_full_object(map) do
{:ok, activity} =
Repo.insert(%Activity{
data: map,
local: local,
actor: map["actor"],
recipients: get_recipients(map)
})
Notification.create_notifications(activity)
stream_out(activity)
{:ok, activity}
else
%Activity{} = activity -> {:ok, activity}
error -> {:error, error}
end
end
def stream_out(activity) do
public = "https://www.w3.org/ns/activitystreams#Public"
if activity.data["type"] in ["Create", "Announce"] do
Pleroma.Web.Streamer.stream("user", activity)
Pleroma.Web.Streamer.stream("list", activity)
if Enum.member?(activity.data["to"], public) do
Pleroma.Web.Streamer.stream("public", activity)
if activity.local do
Pleroma.Web.Streamer.stream("public:local", activity)
end
if activity.data["object"]["attachment"] != [] do
Pleroma.Web.Streamer.stream("public:media", activity)
if activity.local do
Pleroma.Web.Streamer.stream("public:local:media", activity)
end
end
else
if !Enum.member?(activity.data["cc"] || [], public) &&
!Enum.member?(
activity.data["to"],
User.get_by_ap_id(activity.data["actor"]).follower_address
),
do: Pleroma.Web.Streamer.stream("direct", activity)
end
end
end
def create(%{to: to, actor: actor, context: context, object: object} = params) do
additional = params[:additional] || %{}
# only accept false as false value
local = !(params[:local] == false)
published = params[:published]
with create_data <-
make_create_data(
%{to: to, actor: actor, published: published, context: context, object: object},
additional
),
{:ok, activity} <- insert(create_data, local),
:ok <- maybe_federate(activity),
{:ok, _actor} <- User.increase_note_count(actor) do
{:ok, activity}
end
end
def accept(%{to: to, actor: actor, object: object} = params) do
# only accept false as false value
local = !(params[:local] == false)
with data <- %{"to" => to, "type" => "Accept", "actor" => actor, "object" => object},
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def reject(%{to: to, actor: actor, object: object} = params) do
# only accept false as false value
local = !(params[:local] == false)
with data <- %{"to" => to, "type" => "Reject", "actor" => actor, "object" => object},
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
# only accept false as false value
local = !(params[:local] == false)
with data <- %{
"to" => to,
"cc" => cc,
"type" => "Update",
"actor" => actor,
"object" => object
},
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
# TODO: This is weird, maybe we shouldn't check here if we can make the activity.
def like(
%User{ap_id: ap_id} = user,
%Object{data: %{"id" => _}} = object,
activity_id \\ nil,
local \\ true
) do
with nil <- get_existing_like(ap_id, object),
like_data <- make_like_data(user, object, activity_id),
{:ok, activity} <- insert(like_data, local),
{:ok, object} <- add_like_to_object(activity, object),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
%Activity{} = activity -> {:ok, activity, object}
error -> {:error, error}
end
end
def unlike(
%User{} = actor,
%Object{} = object,
activity_id \\ nil,
local \\ true
) do
with %Activity{} = like_activity <- get_existing_like(actor.ap_id, object),
unlike_data <- make_unlike_data(actor, like_activity, activity_id),
{:ok, unlike_activity} <- insert(unlike_data, local),
{:ok, _activity} <- Repo.delete(like_activity),
{:ok, object} <- remove_like_from_object(like_activity, object),
:ok <- maybe_federate(unlike_activity) do
{:ok, unlike_activity, like_activity, object}
else
_e -> {:ok, object}
end
end
def announce(
%User{ap_id: _} = user,
%Object{data: %{"id" => _}} = object,
activity_id \\ nil,
local \\ true
) do
with true <- is_public?(object),
announce_data <- make_announce_data(user, object, activity_id),
{:ok, activity} <- insert(announce_data, local),
{:ok, object} <- add_announce_to_object(activity, object),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
error -> {:error, error}
end
end
def unannounce(
%User{} = actor,
%Object{} = object,
activity_id \\ nil,
local \\ true
) do
with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
{:ok, unannounce_activity} <- insert(unannounce_data, local),
:ok <- maybe_federate(unannounce_activity),
{:ok, _activity} <- Repo.delete(announce_activity),
{:ok, object} <- remove_announce_from_object(announce_activity, object) do
{:ok, unannounce_activity, object}
else
_e -> {:ok, object}
end
end
def follow(follower, followed, activity_id \\ nil, local \\ true) do
with data <- make_follow_data(follower, followed, activity_id),
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
{:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
{:ok, activity} <- insert(unfollow_data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def delete(%Object{data: %{"id" => id, "actor" => actor}} = object, local \\ true) do
user = User.get_cached_by_ap_id(actor)
data = %{
"type" => "Delete",
"actor" => actor,
"object" => id,
"to" => [user.follower_address, "https://www.w3.org/ns/activitystreams#Public"]
}
with Repo.delete(object),
Repo.delete_all(Activity.all_non_create_by_object_ap_id_q(id)),
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity),
{:ok, _actor} <- User.decrease_note_count(user) do
{:ok, activity}
end
end
def block(blocker, blocked, activity_id \\ nil, local \\ true) do
ap_config = Application.get_env(:pleroma, :activitypub)
unfollow_blocked = Keyword.get(ap_config, :unfollow_blocked)
outgoing_blocks = Keyword.get(ap_config, :outgoing_blocks)
with true <- unfollow_blocked do
follow_activity = fetch_latest_follow(blocker, blocked)
if follow_activity do
unfollow(blocker, blocked, nil, local)
end
end
with true <- outgoing_blocks,
block_data <- make_block_data(blocker, blocked, activity_id),
{:ok, activity} <- insert(block_data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
_e -> {:ok, nil}
end
end
def unblock(blocker, blocked, activity_id \\ nil, local \\ true) do
with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
{:ok, activity} <- insert(unblock_data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def fetch_activities_for_context(context, opts \\ %{}) do
public = ["https://www.w3.org/ns/activitystreams#Public"]
recipients =
if opts["user"], do: [opts["user"].ap_id | opts["user"].following] ++ public, else: public
query = from(activity in Activity)
query =
query
|> restrict_blocked(opts)
|> restrict_recipients(recipients, opts["user"])
query =
from(
activity in query,
where:
fragment(
"?->>'type' = ? and ?->>'context' = ?",
activity.data,
"Create",
activity.data,
^context
),
order_by: [desc: :id]
)
Repo.all(query)
end
def fetch_public_activities(opts \\ %{}) do
q = fetch_activities_query(["https://www.w3.org/ns/activitystreams#Public"], opts)
q
|> restrict_unlisted()
|> Repo.all()
|> Enum.reverse()
end
@valid_visibilities ~w[direct unlisted public private]
defp restrict_visibility(query, %{visibility: "direct"}) do
public = "https://www.w3.org/ns/activitystreams#Public"
from(
activity in query,
join: sender in User,
on: sender.ap_id == activity.actor,
# Are non-direct statuses with no to/cc possible?
where:
fragment(
"not (? && ?)",
[^public, sender.follower_address],
activity.recipients
)
)
end
defp restrict_visibility(_query, %{visibility: visibility})
when visibility not in @valid_visibilities do
Logger.error("Could not restrict visibility to #{visibility}")
end
defp restrict_visibility(query, _visibility), do: query
def fetch_user_activities(user, reading_user, params \\ %{}) do
params =
params
|> Map.put("type", ["Create", "Announce"])
|> Map.put("actor_id", user.ap_id)
|> Map.put("whole_db", true)
recipients =
if reading_user do
["https://www.w3.org/ns/activitystreams#Public"] ++
[reading_user.ap_id | reading_user.following]
else
["https://www.w3.org/ns/activitystreams#Public"]
end
fetch_activities(recipients, params)
|> Enum.reverse()
end
defp restrict_since(query, %{"since_id" => since_id}) do
from(activity in query, where: activity.id > ^since_id)
end
defp restrict_since(query, _), do: query
defp restrict_tag(query, %{"tag" => tag}) do
from(
activity in query,
where: fragment("? <@ (? #> '{\"object\",\"tag\"}')", ^tag, activity.data)
)
end
defp restrict_tag(query, _), do: query
defp restrict_recipients(query, [], _user), do: query
defp restrict_recipients(query, recipients, nil) do
from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
end
defp restrict_recipients(query, recipients, user) do
from(
activity in query,
where: fragment("? && ?", ^recipients, activity.recipients),
or_where: activity.actor == ^user.ap_id
)
end
defp restrict_limit(query, %{"limit" => limit}) do
from(activity in query, limit: ^limit)
end
defp restrict_limit(query, _), do: query
defp restrict_local(query, %{"local_only" => true}) do
from(activity in query, where: activity.local == true)
end
defp restrict_local(query, _), do: query
defp restrict_max(query, %{"max_id" => max_id}) do
from(activity in query, where: activity.id < ^max_id)
end
defp restrict_max(query, _), do: query
defp restrict_actor(query, %{"actor_id" => actor_id}) do
from(activity in query, where: activity.actor == ^actor_id)
end
defp restrict_actor(query, _), do: query
defp restrict_type(query, %{"type" => type}) when is_binary(type) do
restrict_type(query, %{"type" => [type]})
end
defp restrict_type(query, %{"type" => type}) do
from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
end
defp restrict_type(query, _), do: query
defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do
from(
activity in query,
where: fragment("? <@ (? #> '{\"object\",\"likes\"}')", ^ap_id, activity.data)
)
end
defp restrict_favorited_by(query, _), do: query
defp restrict_media(query, %{"only_media" => val}) when val == "true" or val == "1" do
from(
activity in query,
where: fragment("not (? #> '{\"object\",\"attachment\"}' = ?)", activity.data, ^[])
)
end
defp restrict_media(query, _), do: query
defp restrict_replies(query, %{"exclude_replies" => val}) when val == "true" or val == "1" do
from(
activity in query,
where: fragment("?->'object'->>'inReplyTo' is null", activity.data)
)
end
defp restrict_replies(query, _), do: query
# Only search through last 100_000 activities by default
defp restrict_recent(query, %{"whole_db" => true}), do: query
defp restrict_recent(query, _) do
since = (Repo.aggregate(Activity, :max, :id) || 0) - 100_000
from(activity in query, where: activity.id > ^since)
end
defp restrict_blocked(query, %{"blocking_user" => %User{info: info}}) do
blocks = info["blocks"] || []
domain_blocks = info["domain_blocks"] || []
from(
activity in query,
where: fragment("not (? = ANY(?))", activity.actor, ^blocks),
where: fragment("not (?->'to' \\?| ?)", activity.data, ^blocks),
where: fragment("not (split_part(?, '/', 3) = ANY(?))", activity.actor, ^domain_blocks)
)
end
defp restrict_blocked(query, _), do: query
defp restrict_unlisted(query) do
from(
activity in query,
where:
fragment(
"not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
activity.data,
^["https://www.w3.org/ns/activitystreams#Public"]
)
)
end
def fetch_activities_query(recipients, opts \\ %{}) do
base_query =
from(
activity in Activity,
limit: 20,
order_by: [fragment("? desc nulls last", activity.id)]
)
base_query
|> restrict_recipients(recipients, opts["user"])
|> restrict_tag(opts)
|> restrict_since(opts)
|> restrict_local(opts)
|> restrict_limit(opts)
|> restrict_max(opts)
|> restrict_actor(opts)
|> restrict_type(opts)
|> restrict_favorited_by(opts)
|> restrict_recent(opts)
|> restrict_blocked(opts)
|> restrict_media(opts)
|> restrict_visibility(opts)
|> restrict_replies(opts)
end
def fetch_activities(recipients, opts \\ %{}) do
fetch_activities_query(recipients, opts)
|> Repo.all()
|> Enum.reverse()
end
def upload(file) do
data = Upload.store(file, Application.get_env(:pleroma, :instance)[:dedupe_media])
Repo.insert(%Object{data: data})
end
def user_data_from_user_object(data) do
avatar =
data["icon"]["url"] &&
%{
"type" => "Image",
"url" => [%{"href" => data["icon"]["url"]}]
}
banner =
data["image"]["url"] &&
%{
"type" => "Image",
"url" => [%{"href" => data["image"]["url"]}]
}
locked = data["manuallyApprovesFollowers"] || false
data = Transmogrifier.maybe_fix_user_object(data)
user_data = %{
ap_id: data["id"],
info: %{
"ap_enabled" => true,
"source_data" => data,
"banner" => banner,
"locked" => locked
},
avatar: avatar,
nickname: "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}",
name: data["name"],
follower_address: data["followers"],
bio: data["summary"]
}
{:ok, user_data}
end
def fetch_and_prepare_user_from_ap_id(ap_id) do
with {:ok, %{status_code: 200, body: body}} <-
@httpoison.get(ap_id, Accept: "application/activity+json"),
{:ok, data} <- Jason.decode(body) do
user_data_from_user_object(data)
else
e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
end
end
def make_user_from_ap_id(ap_id) do
if _user = User.get_by_ap_id(ap_id) do
Transmogrifier.upgrade_user_from_ap_id(ap_id)
else
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
User.insert_or_update_user(data)
else
e -> {:error, e}
end
end
end
def make_user_from_nickname(nickname) do
with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do
make_user_from_ap_id(ap_id)
else
_e -> {:error, "No AP id in WebFinger"}
end
end
@quarantined_instances Keyword.get(@instance, :quarantined_instances, [])
def should_federate?(inbox, public) do
if public do
true
else
inbox_info = URI.parse(inbox)
inbox_info.host not in @quarantined_instances
end
end
def publish(actor, activity) do
followers =
if actor.follower_address in activity.recipients do
{:ok, followers} = User.get_followers(actor)
followers |> Enum.filter(&(!&1.local))
else
[]
end
public = is_public?(activity)
remote_inboxes =
(Pleroma.Web.Salmon.remote_users(activity) ++ followers)
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
|> Enum.map(fn %{info: %{"source_data" => data}} ->
(data["endpoints"] && data["endpoints"]["sharedInbox"]) || data["inbox"]
end)
|> Enum.uniq()
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data)
Enum.each(remote_inboxes, fn inbox ->
Federator.enqueue(:publish_single_ap, %{
inbox: inbox,
json: json,
actor: actor,
id: activity.data["id"]
})
end)
end
def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
Logger.info("Federating #{id} to #{inbox}")
host = URI.parse(inbox).host
signature =
Pleroma.Web.HTTPSignatures.sign(actor, %{host: host, "content-length": byte_size(json)})
@httpoison.post(
inbox,
json,
[{"Content-Type", "application/activity+json"}, {"signature", signature}],
hackney: [pool: :default]
)
end
# TODO:
# This will create a Create activity, which we need internally at the moment.
def fetch_object_from_id(id) do
if object = Object.get_cached_by_ap_id(id) do
{:ok, object}
else
Logger.info("Fetching #{id} via AP")
with true <- String.starts_with?(id, "http"),
{:ok, %{body: body, status_code: code}} when code in 200..299 <-
@httpoison.get(
id,
[Accept: "application/activity+json"],
follow_redirect: true,
timeout: 10000,
recv_timeout: 20000
),
{:ok, data} <- Jason.decode(body),
- nil <- Object.get_by_ap_id(data["id"]),
+ nil <- Object.normalize(data),
params <- %{
"type" => "Create",
"to" => data["to"],
"cc" => data["cc"],
"actor" => data["attributedTo"],
"object" => data
},
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
- {:ok, Object.get_by_ap_id(activity.data["object"]["id"])}
+ {:ok, Object.normalize(activity.data["object"])}
else
object = %Object{} ->
{:ok, object}
_e ->
Logger.info("Couldn't get object via AP, trying out OStatus fetching...")
case OStatus.fetch_activity_from_url(id) do
- {:ok, [activity | _]} -> {:ok, Object.get_by_ap_id(activity.data["object"]["id"])}
+ {:ok, [activity | _]} -> {:ok, Object.normalize(activity.data["object"])}
e -> e
end
end
end
end
def is_public?(activity) do
"https://www.w3.org/ns/activitystreams#Public" in (activity.data["to"] ++
(activity.data["cc"] || []))
end
def visible_for_user?(activity, nil) do
is_public?(activity)
end
def visible_for_user?(activity, user) do
x = [user.ap_id | user.following]
y = activity.data["to"] ++ (activity.data["cc"] || [])
visible_for_user?(activity, nil) || Enum.any?(x, &(&1 in y))
end
end
diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex
index 3dd3df553..0d2166196 100644
--- a/lib/pleroma/web/activity_pub/transmogrifier.ex
+++ b/lib/pleroma/web/activity_pub/transmogrifier.ex
@@ -1,744 +1,730 @@
defmodule Pleroma.Web.ActivityPub.Transmogrifier do
@moduledoc """
A module to handle coding from internal to wire ActivityPub and back.
"""
alias Pleroma.User
alias Pleroma.Object
alias Pleroma.Activity
alias Pleroma.Repo
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Utils
import Ecto.Query
require Logger
@doc """
Modifies an incoming AP object (mastodon format) to our internal format.
"""
def fix_object(object) do
object
|> fix_actor
|> fix_attachments
|> fix_context
|> fix_in_reply_to
|> fix_emoji
|> fix_tag
|> fix_content_map
end
def fix_actor(%{"attributedTo" => actor} = object) do
# attributedTo can be a list in the case of peertube or plume
actor =
if is_list(actor) do
Enum.at(actor, 0)
else
actor
end
object
|> Map.put("actor", actor)
end
def fix_in_reply_to(%{"inReplyTo" => in_reply_to_id} = object)
when not is_nil(in_reply_to_id) do
case ActivityPub.fetch_object_from_id(in_reply_to_id) do
{:ok, replied_object} ->
with %Activity{} = activity <-
Activity.get_create_activity_by_object_ap_id(replied_object.data["id"]) do
object
|> Map.put("inReplyTo", replied_object.data["id"])
|> Map.put("inReplyToAtomUri", object["inReplyToAtomUri"] || in_reply_to_id)
|> Map.put("inReplyToStatusId", activity.id)
|> Map.put("conversation", replied_object.data["context"] || object["conversation"])
|> Map.put("context", replied_object.data["context"] || object["conversation"])
else
e ->
Logger.error("Couldn't fetch #{object["inReplyTo"]} #{inspect(e)}")
object
end
e ->
Logger.error("Couldn't fetch #{object["inReplyTo"]} #{inspect(e)}")
object
end
end
def fix_in_reply_to(object), do: object
def fix_context(object) do
object
|> Map.put("context", object["conversation"])
end
def fix_attachments(object) do
attachments =
(object["attachment"] || [])
|> Enum.map(fn data ->
url = [%{"type" => "Link", "mediaType" => data["mediaType"], "href" => data["url"]}]
Map.put(data, "url", url)
end)
object
|> Map.put("attachment", attachments)
end
def fix_emoji(object) do
tags = object["tag"] || []
emoji = tags |> Enum.filter(fn data -> data["type"] == "Emoji" and data["icon"] end)
emoji =
emoji
|> Enum.reduce(%{}, fn data, mapping ->
name = data["name"]
name =
if String.starts_with?(name, ":") do
name |> String.slice(1..-2)
else
name
end
mapping |> Map.put(name, data["icon"]["url"])
end)
# we merge mastodon and pleroma emoji into a single mapping, to allow for both wire formats
emoji = Map.merge(object["emoji"] || %{}, emoji)
object
|> Map.put("emoji", emoji)
end
def fix_tag(object) do
tags =
(object["tag"] || [])
|> Enum.filter(fn data -> data["type"] == "Hashtag" and data["name"] end)
|> Enum.map(fn data -> String.slice(data["name"], 1..-1) end)
combined = (object["tag"] || []) ++ tags
object
|> Map.put("tag", combined)
end
# content map usually only has one language so this will do for now.
def fix_content_map(%{"contentMap" => content_map} = object) do
content_groups = Map.to_list(content_map)
{_, content} = Enum.at(content_groups, 0)
object
|> Map.put("content", content)
end
def fix_content_map(object), do: object
# TODO: validate those with a Ecto scheme
# - tags
# - emoji
def handle_incoming(%{"type" => "Create", "object" => %{"type" => objtype} = object} = data)
when objtype in ["Article", "Note"] do
with nil <- Activity.get_create_activity_by_object_ap_id(object["id"]),
%User{} = user <- User.get_or_fetch_by_ap_id(data["actor"]) do
# prefer the activity's actor instead of attributedTo
object =
fix_object(data["object"])
|> Map.put("actor", data["actor"])
params = %{
to: data["to"],
object: object,
actor: user,
context: object["conversation"],
local: false,
published: data["published"],
additional:
Map.take(data, [
"cc",
"id"
])
}
ActivityPub.create(params)
else
%Activity{} = activity -> {:ok, activity}
_e -> :error
end
end
def handle_incoming(
%{"type" => "Follow", "object" => followed, "actor" => follower, "id" => id} = data
) do
with %User{local: true} = followed <- User.get_cached_by_ap_id(followed),
%User{} = follower <- User.get_or_fetch_by_ap_id(follower),
{:ok, activity} <- ActivityPub.follow(follower, followed, id, false) do
if not User.locked?(followed) do
ActivityPub.accept(%{
to: [follower.ap_id],
actor: followed.ap_id,
object: data,
local: true
})
User.follow(follower, followed)
end
{:ok, activity}
else
_e -> :error
end
end
defp mastodon_follow_hack(%{"id" => id, "actor" => follower_id}, followed) do
with true <- id =~ "follows",
%User{local: true} = follower <- User.get_cached_by_ap_id(follower_id),
%Activity{} = activity <- Utils.fetch_latest_follow(follower, followed) do
{:ok, activity}
else
_ -> {:error, nil}
end
end
defp mastodon_follow_hack(_), do: {:error, nil}
defp get_follow_activity(follow_object, followed) do
with object_id when not is_nil(object_id) <- Utils.get_ap_id(follow_object),
{_, %Activity{} = activity} <- {:activity, Activity.get_by_ap_id(object_id)} do
{:ok, activity}
else
# Can't find the activity. This might a Mastodon 2.3 "Accept"
{:activity, nil} ->
mastodon_follow_hack(follow_object, followed)
_ ->
{:error, nil}
end
end
def handle_incoming(
%{"type" => "Accept", "object" => follow_object, "actor" => actor, "id" => id} = data
) do
with %User{} = followed <- User.get_or_fetch_by_ap_id(actor),
{:ok, follow_activity} <- get_follow_activity(follow_object, followed),
%User{local: true} = follower <- User.get_cached_by_ap_id(follow_activity.data["actor"]),
{:ok, activity} <-
ActivityPub.accept(%{
to: follow_activity.data["to"],
type: "Accept",
actor: followed.ap_id,
object: follow_activity.data["id"],
local: false
}) do
if not User.following?(follower, followed) do
{:ok, follower} = User.follow(follower, followed)
end
{:ok, activity}
else
_e -> :error
end
end
def handle_incoming(
%{"type" => "Reject", "object" => follow_object, "actor" => actor, "id" => id} = data
) do
with %User{} = followed <- User.get_or_fetch_by_ap_id(actor),
{:ok, follow_activity} <- get_follow_activity(follow_object, followed),
%User{local: true} = follower <- User.get_cached_by_ap_id(follow_activity.data["actor"]),
{:ok, activity} <-
ActivityPub.accept(%{
to: follow_activity.data["to"],
type: "Accept",
actor: followed.ap_id,
object: follow_activity.data["id"],
local: false
}) do
User.unfollow(follower, followed)
{:ok, activity}
else
_e -> :error
end
end
def handle_incoming(
%{"type" => "Like", "object" => object_id, "actor" => actor, "id" => id} = _data
) do
with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
{:ok, object} <-
get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
{:ok, activity, _object} <- ActivityPub.like(actor, object, id, false) do
{:ok, activity}
else
_e -> :error
end
end
def handle_incoming(
%{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = _data
) do
with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
{:ok, object} <-
get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
{:ok, activity, _object} <- ActivityPub.announce(actor, object, id, false) do
{:ok, activity}
else
_e -> :error
end
end
def handle_incoming(
%{"type" => "Update", "object" => %{"type" => "Person"} = object, "actor" => actor_id} =
data
) do
with %User{ap_id: ^actor_id} = actor <- User.get_by_ap_id(object["id"]) do
{:ok, new_user_data} = ActivityPub.user_data_from_user_object(object)
banner = new_user_data[:info]["banner"]
locked = new_user_data[:info]["locked"] || false
update_data =
new_user_data
|> Map.take([:name, :bio, :avatar])
|> Map.put(:info, Map.merge(actor.info, %{"banner" => banner, "locked" => locked}))
actor
|> User.upgrade_changeset(update_data)
|> User.update_and_set_cache()
ActivityPub.update(%{
local: false,
to: data["to"] || [],
cc: data["cc"] || [],
object: object,
actor: actor_id
})
else
e ->
Logger.error(e)
:error
end
end
# TODO: Make secure.
def handle_incoming(
%{"type" => "Delete", "object" => object_id, "actor" => actor, "id" => _id} = _data
) do
object_id = Utils.get_ap_id(object_id)
with %User{} = _actor <- User.get_or_fetch_by_ap_id(actor),
{:ok, object} <-
get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
{:ok, activity} <- ActivityPub.delete(object, false) do
{:ok, activity}
else
_e -> :error
end
end
def handle_incoming(
%{
"type" => "Undo",
"object" => %{"type" => "Announce", "object" => object_id},
"actor" => actor,
"id" => id
} = _data
) do
with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
{:ok, object} <-
get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
{:ok, activity, _} <- ActivityPub.unannounce(actor, object, id, false) do
{:ok, activity}
else
_e -> :error
end
end
def handle_incoming(
%{
"type" => "Undo",
"object" => %{"type" => "Follow", "object" => followed},
"actor" => follower,
"id" => id
} = _data
) do
with %User{local: true} = followed <- User.get_cached_by_ap_id(followed),
%User{} = follower <- User.get_or_fetch_by_ap_id(follower),
{:ok, activity} <- ActivityPub.unfollow(follower, followed, id, false) do
User.unfollow(follower, followed)
{:ok, activity}
else
e -> :error
end
end
@ap_config Application.get_env(:pleroma, :activitypub)
@accept_blocks Keyword.get(@ap_config, :accept_blocks)
def handle_incoming(
%{
"type" => "Undo",
"object" => %{"type" => "Block", "object" => blocked},
"actor" => blocker,
"id" => id
} = _data
) do
with true <- @accept_blocks,
%User{local: true} = blocked <- User.get_cached_by_ap_id(blocked),
%User{} = blocker <- User.get_or_fetch_by_ap_id(blocker),
{:ok, activity} <- ActivityPub.unblock(blocker, blocked, id, false) do
User.unblock(blocker, blocked)
{:ok, activity}
else
e -> :error
end
end
def handle_incoming(
%{"type" => "Block", "object" => blocked, "actor" => blocker, "id" => id} = data
) do
with true <- @accept_blocks,
%User{local: true} = blocked = User.get_cached_by_ap_id(blocked),
%User{} = blocker = User.get_or_fetch_by_ap_id(blocker),
{:ok, activity} <- ActivityPub.block(blocker, blocked, id, false) do
User.unfollow(blocker, blocked)
User.block(blocker, blocked)
{:ok, activity}
else
e -> :error
end
end
def handle_incoming(
%{
"type" => "Undo",
"object" => %{"type" => "Like", "object" => object_id},
"actor" => actor,
"id" => id
} = _data
) do
with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
{:ok, object} <-
get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
{:ok, activity, _, _} <- ActivityPub.unlike(actor, object, id, false) do
{:ok, activity}
else
_e -> :error
end
end
def handle_incoming(_), do: :error
def get_obj_helper(id) do
- if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil
+ if object = Object.normalize(id), do: {:ok, object}, else: nil
end
def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do
with false <- String.starts_with?(inReplyTo, "http"),
{:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do
Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo)
else
_e -> object
end
end
def set_reply_to_uri(obj), do: obj
# Prepares the object of an outgoing create activity.
def prepare_object(object) do
object
|> set_sensitive
|> add_hashtags
|> add_mention_tags
|> add_emoji_tags
|> add_attributed_to
|> prepare_attachments
|> set_conversation
|> set_reply_to_uri
end
# @doc
# """
# internal -> Mastodon
# """
def prepare_outgoing(%{"type" => "Create", "object" => %{"type" => "Note"} = object} = data) do
object =
object
|> prepare_object
data =
data
|> Map.put("object", object)
|> Map.put("@context", "https://www.w3.org/ns/activitystreams")
{:ok, data}
end
# Mastodon Accept/Reject requires a non-normalized object containing the actor URIs,
# because of course it does.
def prepare_outgoing(%{"type" => "Accept"} = data) do
- follow_activity_id =
- if is_binary(data["object"]) do
- data["object"]
- else
- data["object"]["id"]
- end
-
- with follow_activity <- Activity.get_by_ap_id(follow_activity_id) do
+ with follow_activity <- Activity.normalize(data["object"]) do
object = %{
"actor" => follow_activity.actor,
"object" => follow_activity.data["object"],
"id" => follow_activity.data["id"],
"type" => "Follow"
}
data =
data
|> Map.put("object", object)
|> Map.put("@context", "https://www.w3.org/ns/activitystreams")
{:ok, data}
end
end
def prepare_outgoing(%{"type" => "Reject"} = data) do
- follow_activity_id =
- if is_binary(data["object"]) do
- data["object"]
- else
- data["object"]["id"]
- end
-
- with follow_activity <- Activity.get_by_ap_id(follow_activity_id) do
+ with follow_activity <- Activity.normalize(data["object"]) do
object = %{
"actor" => follow_activity.actor,
"object" => follow_activity.data["object"],
"id" => follow_activity.data["id"],
"type" => "Follow"
}
data =
data
|> Map.put("object", object)
|> Map.put("@context", "https://www.w3.org/ns/activitystreams")
{:ok, data}
end
end
def prepare_outgoing(%{"type" => _type} = data) do
data =
data
|> maybe_fix_object_url
|> Map.put("@context", "https://www.w3.org/ns/activitystreams")
{:ok, data}
end
def maybe_fix_object_url(data) do
if is_binary(data["object"]) and not String.starts_with?(data["object"], "http") do
case ActivityPub.fetch_object_from_id(data["object"]) do
{:ok, relative_object} ->
if relative_object.data["external_url"] do
_data =
data
|> Map.put("object", relative_object.data["external_url"])
else
data
end
e ->
Logger.error("Couldn't fetch #{data["object"]} #{inspect(e)}")
data
end
else
data
end
end
def add_hashtags(object) do
tags =
(object["tag"] || [])
|> Enum.map(fn tag ->
%{
"href" => Pleroma.Web.Endpoint.url() <> "/tags/#{tag}",
"name" => "##{tag}",
"type" => "Hashtag"
}
end)
object
|> Map.put("tag", tags)
end
def add_mention_tags(object) do
recipients = object["to"] ++ (object["cc"] || [])
mentions =
recipients
|> Enum.map(fn ap_id -> User.get_cached_by_ap_id(ap_id) end)
|> Enum.filter(& &1)
|> Enum.map(fn user ->
%{"type" => "Mention", "href" => user.ap_id, "name" => "@#{user.nickname}"}
end)
tags = object["tag"] || []
object
|> Map.put("tag", tags ++ mentions)
end
# TODO: we should probably send mtime instead of unix epoch time for updated
def add_emoji_tags(object) do
tags = object["tag"] || []
emoji = object["emoji"] || []
out =
emoji
|> Enum.map(fn {name, url} ->
%{
"icon" => %{"url" => url, "type" => "Image"},
"name" => ":" <> name <> ":",
"type" => "Emoji",
"updated" => "1970-01-01T00:00:00Z",
"id" => url
}
end)
object
|> Map.put("tag", tags ++ out)
end
def set_conversation(object) do
Map.put(object, "conversation", object["context"])
end
def set_sensitive(object) do
tags = object["tag"] || []
Map.put(object, "sensitive", "nsfw" in tags)
end
def add_attributed_to(object) do
attributedTo = object["attributedTo"] || object["actor"]
object
|> Map.put("attributedTo", attributedTo)
end
def prepare_attachments(object) do
attachments =
(object["attachment"] || [])
|> Enum.map(fn data ->
[%{"mediaType" => media_type, "href" => href} | _] = data["url"]
%{"url" => href, "mediaType" => media_type, "name" => data["name"], "type" => "Document"}
end)
object
|> Map.put("attachment", attachments)
end
defp user_upgrade_task(user) do
old_follower_address = User.ap_followers(user)
q =
from(
u in User,
where: ^old_follower_address in u.following,
update: [
set: [
following:
fragment(
"array_replace(?,?,?)",
u.following,
^old_follower_address,
^user.follower_address
)
]
]
)
Repo.update_all(q, [])
maybe_retire_websub(user.ap_id)
# Only do this for recent activties, don't go through the whole db.
# Only look at the last 1000 activities.
since = (Repo.aggregate(Activity, :max, :id) || 0) - 1_000
q =
from(
a in Activity,
where: ^old_follower_address in a.recipients,
where: a.id > ^since,
update: [
set: [
recipients:
fragment(
"array_replace(?,?,?)",
a.recipients,
^old_follower_address,
^user.follower_address
)
]
]
)
Repo.update_all(q, [])
end
def upgrade_user_from_ap_id(ap_id, async \\ true) do
with %User{local: false} = user <- User.get_by_ap_id(ap_id),
{:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id) do
data =
data
|> Map.put(:info, Map.merge(user.info, data[:info]))
already_ap = User.ap_enabled?(user)
{:ok, user} =
User.upgrade_changeset(user, data)
|> Repo.update()
if !already_ap do
# This could potentially take a long time, do it in the background
if async do
Task.start(fn ->
user_upgrade_task(user)
end)
else
user_upgrade_task(user)
end
end
{:ok, user}
else
e -> e
end
end
def maybe_retire_websub(ap_id) do
# some sanity checks
if is_binary(ap_id) && String.length(ap_id) > 8 do
q =
from(
ws in Pleroma.Web.Websub.WebsubClientSubscription,
where: fragment("? like ?", ws.topic, ^"#{ap_id}%")
)
Repo.delete_all(q)
end
end
def maybe_fix_user_url(data) do
if is_map(data["url"]) do
Map.put(data, "url", data["url"]["href"])
else
data
end
end
def maybe_fix_user_object(data) do
data
|> maybe_fix_user_url
end
end
diff --git a/lib/pleroma/web/common_api/common_api.ex b/lib/pleroma/web/common_api/common_api.ex
index 8845419c2..3f18a68e8 100644
--- a/lib/pleroma/web/common_api/common_api.ex
+++ b/lib/pleroma/web/common_api/common_api.ex
@@ -1,129 +1,129 @@
defmodule Pleroma.Web.CommonAPI do
alias Pleroma.{Repo, Activity, Object}
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Formatter
import Pleroma.Web.CommonAPI.Utils
def delete(activity_id, user) do
with %Activity{data: %{"object" => %{"id" => object_id}}} <- Repo.get(Activity, activity_id),
- %Object{} = object <- Object.get_by_ap_id(object_id),
+ %Object{} = object <- Object.normalize(object_id),
true <- user.info["is_moderator"] || user.ap_id == object.data["actor"],
{:ok, delete} <- ActivityPub.delete(object) do
{:ok, delete}
end
end
def repeat(id_or_ap_id, user) do
with %Activity{} = activity <- get_by_id_or_ap_id(id_or_ap_id),
- object <- Object.get_by_ap_id(activity.data["object"]["id"]) do
+ object <- Object.normalize(activity.data["object"]["id"]) do
ActivityPub.announce(user, object)
else
_ ->
{:error, "Could not repeat"}
end
end
def unrepeat(id_or_ap_id, user) do
with %Activity{} = activity <- get_by_id_or_ap_id(id_or_ap_id),
- object <- Object.get_by_ap_id(activity.data["object"]["id"]) do
+ object <- Object.normalize(activity.data["object"]["id"]) do
ActivityPub.unannounce(user, object)
else
_ ->
{:error, "Could not unrepeat"}
end
end
def favorite(id_or_ap_id, user) do
with %Activity{} = activity <- get_by_id_or_ap_id(id_or_ap_id),
false <- activity.data["actor"] == user.ap_id,
- object <- Object.get_by_ap_id(activity.data["object"]["id"]) do
+ object <- Object.normalize(activity.data["object"]["id"]) do
ActivityPub.like(user, object)
else
_ ->
{:error, "Could not favorite"}
end
end
def unfavorite(id_or_ap_id, user) do
with %Activity{} = activity <- get_by_id_or_ap_id(id_or_ap_id),
false <- activity.data["actor"] == user.ap_id,
- object <- Object.get_by_ap_id(activity.data["object"]["id"]) do
+ object <- Object.normalize(activity.data["object"]["id"]) do
ActivityPub.unlike(user, object)
else
_ ->
{:error, "Could not unfavorite"}
end
end
def get_visibility(%{"visibility" => visibility})
when visibility in ~w{public unlisted private direct},
do: visibility
def get_visibility(%{"in_reply_to_status_id" => status_id}) when not is_nil(status_id) do
inReplyTo = get_replied_to_activity(status_id)
Pleroma.Web.MastodonAPI.StatusView.get_visibility(inReplyTo.data["object"])
end
def get_visibility(_), do: "public"
@instance Application.get_env(:pleroma, :instance)
@limit Keyword.get(@instance, :limit)
def post(user, %{"status" => status} = data) do
visibility = get_visibility(data)
with status <- String.trim(status),
length when length in 1..@limit <- String.length(status),
attachments <- attachments_from_ids(data["media_ids"]),
mentions <- Formatter.parse_mentions(status),
inReplyTo <- get_replied_to_activity(data["in_reply_to_status_id"]),
{to, cc} <- to_for_user_and_mentions(user, mentions, inReplyTo, visibility),
tags <- Formatter.parse_tags(status, data),
content_html <-
make_content_html(status, mentions, attachments, tags, data["no_attachment_links"]),
context <- make_context(inReplyTo),
cw <- data["spoiler_text"],
object <-
make_note_data(
user.ap_id,
to,
context,
content_html,
attachments,
inReplyTo,
tags,
cw,
cc
),
object <-
Map.put(
object,
"emoji",
Formatter.get_emoji(status)
|> Enum.reduce(%{}, fn {name, file}, acc ->
Map.put(acc, name, "#{Pleroma.Web.Endpoint.static_url()}#{file}")
end)
) do
res =
ActivityPub.create(%{
to: to,
actor: user,
context: context,
object: object,
additional: %{"cc" => cc}
})
res
end
end
def update(user) do
ActivityPub.update(%{
local: true,
to: [user.follower_address],
cc: [],
actor: user.ap_id,
object: Pleroma.Web.ActivityPub.UserView.render("user.json", %{user: user})
})
end
end
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index 8ca530031..ccefb0bdf 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -1,212 +1,212 @@
defmodule Pleroma.Web.Federator do
use GenServer
alias Pleroma.User
alias Pleroma.Activity
alias Pleroma.Web.{WebFinger, Websub}
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
require Logger
@websub Application.get_env(:pleroma, :websub)
@ostatus Application.get_env(:pleroma, :ostatus)
@httpoison Application.get_env(:pleroma, :httpoison)
@instance Application.get_env(:pleroma, :instance)
@federating Keyword.get(@instance, :federating)
@max_jobs 20
def init(args) do
{:ok, args}
end
def start_link do
spawn(fn ->
# 1 minute
Process.sleep(1000 * 60 * 1)
enqueue(:refresh_subscriptions, nil)
end)
GenServer.start_link(
__MODULE__,
%{
in: {:sets.new(), []},
out: {:sets.new(), []}
},
name: __MODULE__
)
end
def handle(:refresh_subscriptions, _) do
Logger.debug("Federator running refresh subscriptions")
Websub.refresh_subscriptions()
spawn(fn ->
# 6 hours
Process.sleep(1000 * 60 * 60 * 6)
enqueue(:refresh_subscriptions, nil)
end)
end
def handle(:request_subscription, websub) do
Logger.debug("Refreshing #{websub.topic}")
with {:ok, websub} <- Websub.request_subscription(websub) do
Logger.debug("Successfully refreshed #{websub.topic}")
else
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end
def handle(:publish, activity) do
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do
{:ok, actor} = WebFinger.ensure_keys_present(actor)
if ActivityPub.is_public?(activity) do
Logger.info(fn -> "Sending #{activity.data["id"]} out via WebSub" end)
Websub.publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
Logger.info(fn -> "Sending #{activity.data["id"]} out via Salmon" end)
Pleroma.Web.Salmon.publish(actor, activity)
end
Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
Pleroma.Web.ActivityPub.ActivityPub.publish(actor, activity)
end
end
def handle(:verify_websub, websub) do
Logger.debug(fn ->
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
end)
@websub.verify(websub)
end
def handle(:incoming_doc, doc) do
Logger.info("Got document, trying to parse")
@ostatus.handle_incoming(doc)
end
def handle(:incoming_ap_doc, params) do
Logger.info("Handling incoming AP activity")
params = Utils.normalize_params(params)
with {:ok, _user} <- ap_enabled_actor(params["actor"]),
- nil <- Activity.get_by_ap_id(params["id"]),
+ nil <- Activity.normalize(params["id"]),
{:ok, _activity} <- Transmogrifier.handle_incoming(params) do
else
%Activity{} ->
Logger.info("Already had #{params["id"]}")
_e ->
# Just drop those for now
Logger.info("Unhandled activity")
Logger.info(Poison.encode!(params, pretty: 2))
end
end
def handle(:publish_single_ap, params) do
ActivityPub.publish_one(params)
end
def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do
signature = @websub.sign(secret || "", xml)
Logger.debug(fn -> "Pushing #{topic} to #{callback}" end)
with {:ok, %{status_code: code}} <-
@httpoison.post(
callback,
xml,
[
{"Content-Type", "application/atom+xml"},
{"X-Hub-Signature", "sha1=#{signature}"}
],
timeout: 10000,
recv_timeout: 20000,
hackney: [pool: :default]
) do
Logger.debug(fn -> "Pushed to #{callback}, code #{code}" end)
else
e ->
Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
end
end
def handle(type, _) do
Logger.debug(fn -> "Unknown task: #{type}" end)
{:error, "Don't know what to do with this"}
end
def enqueue(type, payload, priority \\ 1) do
if @federating do
if Mix.env() == :test do
handle(type, payload)
else
GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
end
end
end
def maybe_start_job(running_jobs, queue) do
if :sets.size(running_jobs) < @max_jobs && queue != [] do
{{type, payload}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> handle(type, payload) end)
mref = Process.monitor(pid)
{:sets.add_element(mref, running_jobs), queue}
else
{running_jobs, queue}
end
end
def handle_cast({:enqueue, type, payload, _priority}, state)
when type in [:incoming_doc, :incoming_ap_doc] do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
{i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end
def handle_cast({:enqueue, type, payload, _priority}, state) do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
{o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end
def handle_cast(m, state) do
IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
i_running_jobs = :sets.del_element(ref, i_running_jobs)
o_running_jobs = :sets.del_element(ref, o_running_jobs)
{i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
{o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end
def enqueue_sorted(queue, element, priority) do
[%{item: element, priority: priority} | queue]
|> Enum.sort_by(fn %{priority: priority} -> priority end)
end
def queue_pop([%{item: element} | queue]) do
{element, queue}
end
def ap_enabled_actor(id) do
user = User.get_by_ap_id(id)
if User.ap_enabled?(user) do
{:ok, user}
else
ActivityPub.make_user_from_ap_id(id)
end
end
end
diff --git a/lib/pleroma/web/ostatus/activity_representer.ex b/lib/pleroma/web/ostatus/activity_representer.ex
index 4179d86c9..5d831459b 100644
--- a/lib/pleroma/web/ostatus/activity_representer.ex
+++ b/lib/pleroma/web/ostatus/activity_representer.ex
@@ -1,306 +1,306 @@
defmodule Pleroma.Web.OStatus.ActivityRepresenter do
alias Pleroma.{Activity, User, Object}
alias Pleroma.Web.OStatus.UserRepresenter
require Logger
defp get_href(id) do
with %Object{data: %{"external_url" => external_url}} <- Object.get_cached_by_ap_id(id) do
external_url
else
_e -> id
end
end
defp get_in_reply_to(%{"object" => %{"inReplyTo" => in_reply_to}}) do
[
{:"thr:in-reply-to",
[ref: to_charlist(in_reply_to), href: to_charlist(get_href(in_reply_to))], []}
]
end
defp get_in_reply_to(_), do: []
defp get_mentions(to) do
Enum.map(to, fn id ->
cond do
# Special handling for the AP/Ostatus public collections
"https://www.w3.org/ns/activitystreams#Public" == id ->
{:link,
[
rel: "mentioned",
"ostatus:object-type": "http://activitystrea.ms/schema/1.0/collection",
href: "http://activityschema.org/collection/public"
], []}
# Ostatus doesn't handle follower collections, ignore these.
Regex.match?(~r/^#{Pleroma.Web.base_url()}.+followers$/, id) ->
[]
true ->
{:link,
[
rel: "mentioned",
"ostatus:object-type": "http://activitystrea.ms/schema/1.0/person",
href: id
], []}
end
end)
end
defp get_links(%{local: true, data: data}) do
h = fn str -> [to_charlist(str)] end
[
{:link, [type: ['application/atom+xml'], href: h.(data["object"]["id"]), rel: 'self'], []},
{:link, [type: ['text/html'], href: h.(data["object"]["id"]), rel: 'alternate'], []}
]
end
defp get_links(%{
local: false,
data: %{
"object" => %{
"external_url" => external_url
}
}
}) do
h = fn str -> [to_charlist(str)] end
[
{:link, [type: ['text/html'], href: h.(external_url), rel: 'alternate'], []}
]
end
defp get_links(_activity), do: []
defp get_emoji_links(emojis) do
Enum.map(emojis, fn {emoji, file} ->
{:link, [name: to_charlist(emoji), rel: 'emoji', href: to_charlist(file)], []}
end)
end
def to_simple_form(activity, user, with_author \\ false)
def to_simple_form(%{data: %{"object" => %{"type" => "Note"}}} = activity, user, with_author) do
h = fn str -> [to_charlist(str)] end
updated_at = activity.data["object"]["published"]
inserted_at = activity.data["object"]["published"]
attachments =
Enum.map(activity.data["object"]["attachment"] || [], fn attachment ->
url = hd(attachment["url"])
{:link,
[rel: 'enclosure', href: to_charlist(url["href"]), type: to_charlist(url["mediaType"])],
[]}
end)
in_reply_to = get_in_reply_to(activity.data)
author = if with_author, do: [{:author, UserRepresenter.to_simple_form(user)}], else: []
mentions = activity.recipients |> get_mentions
categories =
(activity.data["object"]["tag"] || [])
|> Enum.map(fn tag ->
if is_binary(tag) do
{:category, [term: to_charlist(tag)], []}
else
nil
end
end)
|> Enum.filter(& &1)
emoji_links = get_emoji_links(activity.data["object"]["emoji"] || %{})
summary =
if activity.data["object"]["summary"] do
[{:summary, [], h.(activity.data["object"]["summary"])}]
else
[]
end
[
{:"activity:object-type", ['http://activitystrea.ms/schema/1.0/note']},
{:"activity:verb", ['http://activitystrea.ms/schema/1.0/post']},
# For notes, federate the object id.
{:id, h.(activity.data["object"]["id"])},
{:title, ['New note by #{user.nickname}']},
{:content, [type: 'html'],
h.(activity.data["object"]["content"] |> String.replace(~r/[\n\r]/, ""))},
{:published, h.(inserted_at)},
{:updated, h.(updated_at)},
{:"ostatus:conversation", [ref: h.(activity.data["context"])],
h.(activity.data["context"])},
{:link, [ref: h.(activity.data["context"]), rel: 'ostatus:conversation'], []}
] ++
summary ++
get_links(activity) ++
categories ++ attachments ++ in_reply_to ++ author ++ mentions ++ emoji_links
end
def to_simple_form(%{data: %{"type" => "Like"}} = activity, user, with_author) do
h = fn str -> [to_charlist(str)] end
updated_at = activity.data["published"]
inserted_at = activity.data["published"]
_in_reply_to = get_in_reply_to(activity.data)
author = if with_author, do: [{:author, UserRepresenter.to_simple_form(user)}], else: []
mentions = activity.recipients |> get_mentions
[
{:"activity:verb", ['http://activitystrea.ms/schema/1.0/favorite']},
{:id, h.(activity.data["id"])},
{:title, ['New favorite by #{user.nickname}']},
{:content, [type: 'html'], ['#{user.nickname} favorited something']},
{:published, h.(inserted_at)},
{:updated, h.(updated_at)},
{:"activity:object",
[
{:"activity:object-type", ['http://activitystrea.ms/schema/1.0/note']},
# For notes, federate the object id.
{:id, h.(activity.data["object"])}
]},
{:"ostatus:conversation", [ref: h.(activity.data["context"])],
h.(activity.data["context"])},
{:link, [ref: h.(activity.data["context"]), rel: 'ostatus:conversation'], []},
{:link, [rel: 'self', type: ['application/atom+xml'], href: h.(activity.data["id"])], []},
{:"thr:in-reply-to", [ref: to_charlist(activity.data["object"])], []}
] ++ author ++ mentions
end
def to_simple_form(%{data: %{"type" => "Announce"}} = activity, user, with_author) do
h = fn str -> [to_charlist(str)] end
updated_at = activity.data["published"]
inserted_at = activity.data["published"]
_in_reply_to = get_in_reply_to(activity.data)
author = if with_author, do: [{:author, UserRepresenter.to_simple_form(user)}], else: []
retweeted_activity = Activity.get_create_activity_by_object_ap_id(activity.data["object"])
retweeted_user = User.get_cached_by_ap_id(retweeted_activity.data["actor"])
retweeted_xml = to_simple_form(retweeted_activity, retweeted_user, true)
mentions = activity.recipients |> get_mentions
[
{:"activity:object-type", ['http://activitystrea.ms/schema/1.0/activity']},
{:"activity:verb", ['http://activitystrea.ms/schema/1.0/share']},
{:id, h.(activity.data["id"])},
{:title, ['#{user.nickname} repeated a notice']},
{:content, [type: 'html'], ['RT #{retweeted_activity.data["object"]["content"]}']},
{:published, h.(inserted_at)},
{:updated, h.(updated_at)},
{:"ostatus:conversation", [ref: h.(activity.data["context"])],
h.(activity.data["context"])},
{:link, [ref: h.(activity.data["context"]), rel: 'ostatus:conversation'], []},
{:link, [rel: 'self', type: ['application/atom+xml'], href: h.(activity.data["id"])], []},
{:"activity:object", retweeted_xml}
] ++ mentions ++ author
end
def to_simple_form(%{data: %{"type" => "Follow"}} = activity, user, with_author) do
h = fn str -> [to_charlist(str)] end
updated_at = activity.data["published"]
inserted_at = activity.data["published"]
author = if with_author, do: [{:author, UserRepresenter.to_simple_form(user)}], else: []
mentions = (activity.recipients || []) |> get_mentions
[
{:"activity:object-type", ['http://activitystrea.ms/schema/1.0/activity']},
{:"activity:verb", ['http://activitystrea.ms/schema/1.0/follow']},
{:id, h.(activity.data["id"])},
{:title, ['#{user.nickname} started following #{activity.data["object"]}']},
{:content, [type: 'html'],
['#{user.nickname} started following #{activity.data["object"]}']},
{:published, h.(inserted_at)},
{:updated, h.(updated_at)},
{:"activity:object",
[
{:"activity:object-type", ['http://activitystrea.ms/schema/1.0/person']},
{:id, h.(activity.data["object"])},
{:uri, h.(activity.data["object"])}
]},
{:link, [rel: 'self', type: ['application/atom+xml'], href: h.(activity.data["id"])], []}
] ++ mentions ++ author
end
# Only undos of follow for now. Will need to get redone once there are more
def to_simple_form(
%{data: %{"type" => "Undo", "object" => %{"type" => "Follow"} = follow_activity}} =
activity,
user,
with_author
) do
h = fn str -> [to_charlist(str)] end
updated_at = activity.data["published"]
inserted_at = activity.data["published"]
author = if with_author, do: [{:author, UserRepresenter.to_simple_form(user)}], else: []
mentions = (activity.recipients || []) |> get_mentions
- follow_activity = Activity.get_by_ap_id(follow_activity["id"])
+ follow_activity = Activity.normalize(follow_activity)
[
{:"activity:object-type", ['http://activitystrea.ms/schema/1.0/activity']},
{:"activity:verb", ['http://activitystrea.ms/schema/1.0/unfollow']},
{:id, h.(activity.data["id"])},
{:title, ['#{user.nickname} stopped following #{follow_activity.data["object"]}']},
{:content, [type: 'html'],
['#{user.nickname} stopped following #{follow_activity.data["object"]}']},
{:published, h.(inserted_at)},
{:updated, h.(updated_at)},
{:"activity:object",
[
{:"activity:object-type", ['http://activitystrea.ms/schema/1.0/person']},
{:id, h.(follow_activity.data["object"])},
{:uri, h.(follow_activity.data["object"])}
]},
{:link, [rel: 'self', type: ['application/atom+xml'], href: h.(activity.data["id"])], []}
] ++ mentions ++ author
end
def to_simple_form(%{data: %{"type" => "Delete"}} = activity, user, with_author) do
h = fn str -> [to_charlist(str)] end
updated_at = activity.data["published"]
inserted_at = activity.data["published"]
author = if with_author, do: [{:author, UserRepresenter.to_simple_form(user)}], else: []
[
{:"activity:object-type", ['http://activitystrea.ms/schema/1.0/activity']},
{:"activity:verb", ['http://activitystrea.ms/schema/1.0/delete']},
{:id, h.(activity.data["object"])},
{:title, ['An object was deleted']},
{:content, [type: 'html'], ['An object was deleted']},
{:published, h.(inserted_at)},
{:updated, h.(updated_at)}
] ++ author
end
def to_simple_form(_, _, _), do: nil
def wrap_with_entry(simple_form) do
[
{
:entry,
[
xmlns: 'http://www.w3.org/2005/Atom',
"xmlns:thr": 'http://purl.org/syndication/thread/1.0',
"xmlns:activity": 'http://activitystrea.ms/spec/1.0/',
"xmlns:poco": 'http://portablecontacts.net/spec/1.0',
"xmlns:ostatus": 'http://ostatus.org/schema/1.0'
],
simple_form
}
]
end
end
diff --git a/lib/pleroma/web/ostatus/handlers/delete_handler.ex b/lib/pleroma/web/ostatus/handlers/delete_handler.ex
index 4f3016b65..6330d7f64 100644
--- a/lib/pleroma/web/ostatus/handlers/delete_handler.ex
+++ b/lib/pleroma/web/ostatus/handlers/delete_handler.ex
@@ -1,14 +1,14 @@
defmodule Pleroma.Web.OStatus.DeleteHandler do
require Logger
alias Pleroma.Web.XML
alias Pleroma.Object
alias Pleroma.Web.ActivityPub.ActivityPub
def handle_delete(entry, _doc \\ nil) do
with id <- XML.string_from_xpath("//id", entry),
- object when not is_nil(object) <- Object.get_by_ap_id(id),
+ %Object{} = object <- Object.normalize(id),
{:ok, delete} <- ActivityPub.delete(object, false) do
delete
end
end
end
diff --git a/lib/pleroma/web/ostatus/ostatus.ex b/lib/pleroma/web/ostatus/ostatus.ex
index f0ff0624f..916c894eb 100644
--- a/lib/pleroma/web/ostatus/ostatus.ex
+++ b/lib/pleroma/web/ostatus/ostatus.ex
@@ -1,386 +1,386 @@
defmodule Pleroma.Web.OStatus do
@httpoison Application.get_env(:pleroma, :httpoison)
import Ecto.Query
import Pleroma.Web.XML
require Logger
alias Pleroma.{Repo, User, Web, Object, Activity}
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.{WebFinger, Websub}
alias Pleroma.Web.OStatus.{FollowHandler, UnfollowHandler, NoteHandler, DeleteHandler}
alias Pleroma.Web.ActivityPub.Transmogrifier
def feed_path(user) do
"#{user.ap_id}/feed.atom"
end
def pubsub_path(user) do
"#{Web.base_url()}/push/hub/#{user.nickname}"
end
def salmon_path(user) do
"#{user.ap_id}/salmon"
end
def remote_follow_path do
"#{Web.base_url()}/ostatus_subscribe?acct={uri}"
end
def handle_incoming(xml_string) do
with doc when doc != :error <- parse_document(xml_string) do
entries = :xmerl_xpath.string('//entry', doc)
activities =
Enum.map(entries, fn entry ->
{:xmlObj, :string, object_type} =
:xmerl_xpath.string('string(/entry/activity:object-type[1])', entry)
{:xmlObj, :string, verb} = :xmerl_xpath.string('string(/entry/activity:verb[1])', entry)
Logger.debug("Handling #{verb}")
try do
case verb do
'http://activitystrea.ms/schema/1.0/delete' ->
with {:ok, activity} <- DeleteHandler.handle_delete(entry, doc), do: activity
'http://activitystrea.ms/schema/1.0/follow' ->
with {:ok, activity} <- FollowHandler.handle(entry, doc), do: activity
'http://activitystrea.ms/schema/1.0/unfollow' ->
with {:ok, activity} <- UnfollowHandler.handle(entry, doc), do: activity
'http://activitystrea.ms/schema/1.0/share' ->
with {:ok, activity, retweeted_activity} <- handle_share(entry, doc),
do: [activity, retweeted_activity]
'http://activitystrea.ms/schema/1.0/favorite' ->
with {:ok, activity, favorited_activity} <- handle_favorite(entry, doc),
do: [activity, favorited_activity]
_ ->
case object_type do
'http://activitystrea.ms/schema/1.0/note' ->
with {:ok, activity} <- NoteHandler.handle_note(entry, doc), do: activity
'http://activitystrea.ms/schema/1.0/comment' ->
with {:ok, activity} <- NoteHandler.handle_note(entry, doc), do: activity
_ ->
Logger.error("Couldn't parse incoming document")
nil
end
end
rescue
e ->
Logger.error("Error occured while handling activity")
Logger.error(xml_string)
Logger.error(inspect(e))
nil
end
end)
|> Enum.filter(& &1)
{:ok, activities}
else
_e -> {:error, []}
end
end
def make_share(entry, doc, retweeted_activity) do
with {:ok, actor} <- find_make_or_update_user(doc),
- %Object{} = object <- Object.get_by_ap_id(retweeted_activity.data["object"]["id"]),
+ %Object{} = object <- Object.normalize(retweeted_activity.data["object"]),
id when not is_nil(id) <- string_from_xpath("/entry/id", entry),
{:ok, activity, _object} = ActivityPub.announce(actor, object, id, false) do
{:ok, activity}
end
end
def handle_share(entry, doc) do
with {:ok, retweeted_activity} <- get_or_build_object(entry),
{:ok, activity} <- make_share(entry, doc, retweeted_activity) do
{:ok, activity, retweeted_activity}
else
e -> {:error, e}
end
end
def make_favorite(entry, doc, favorited_activity) do
with {:ok, actor} <- find_make_or_update_user(doc),
- %Object{} = object <- Object.get_by_ap_id(favorited_activity.data["object"]["id"]),
+ %Object{} = object <- Object.normalize(favorited_activity.data["object"]),
id when not is_nil(id) <- string_from_xpath("/entry/id", entry),
{:ok, activity, _object} = ActivityPub.like(actor, object, id, false) do
{:ok, activity}
end
end
def get_or_build_object(entry) do
with {:ok, activity} <- get_or_try_fetching(entry) do
{:ok, activity}
else
_e ->
with [object] <- :xmerl_xpath.string('/entry/activity:object', entry) do
NoteHandler.handle_note(object, object)
end
end
end
def get_or_try_fetching(entry) do
Logger.debug("Trying to get entry from db")
with id when not is_nil(id) <- string_from_xpath("//activity:object[1]/id", entry),
%Activity{} = activity <- Activity.get_create_activity_by_object_ap_id(id) do
{:ok, activity}
else
_ ->
Logger.debug("Couldn't get, will try to fetch")
with href when not is_nil(href) <-
string_from_xpath("//activity:object[1]/link[@type=\"text/html\"]/@href", entry),
{:ok, [favorited_activity]} <- fetch_activity_from_url(href) do
{:ok, favorited_activity}
else
e -> Logger.debug("Couldn't find href: #{inspect(e)}")
end
end
end
def handle_favorite(entry, doc) do
with {:ok, favorited_activity} <- get_or_try_fetching(entry),
{:ok, activity} <- make_favorite(entry, doc, favorited_activity) do
{:ok, activity, favorited_activity}
else
e -> {:error, e}
end
end
def get_attachments(entry) do
:xmerl_xpath.string('/entry/link[@rel="enclosure"]', entry)
|> Enum.map(fn enclosure ->
with href when not is_nil(href) <- string_from_xpath("/link/@href", enclosure),
type when not is_nil(type) <- string_from_xpath("/link/@type", enclosure) do
%{
"type" => "Attachment",
"url" => [
%{
"type" => "Link",
"mediaType" => type,
"href" => href
}
]
}
end
end)
|> Enum.filter(& &1)
end
@doc """
Gets the content from a an entry.
"""
def get_content(entry) do
string_from_xpath("//content", entry)
end
@doc """
Get the cw that mastodon uses.
"""
def get_cw(entry) do
with cw when not is_nil(cw) <- string_from_xpath("/*/summary", entry) do
cw
else
_e -> nil
end
end
def get_tags(entry) do
:xmerl_xpath.string('//category', entry)
|> Enum.map(fn category -> string_from_xpath("/category/@term", category) end)
|> Enum.filter(& &1)
|> Enum.map(&String.downcase/1)
end
def maybe_update(doc, user) do
if "true" == string_from_xpath("//author[1]/ap_enabled", doc) do
Transmogrifier.upgrade_user_from_ap_id(user.ap_id)
else
maybe_update_ostatus(doc, user)
end
end
def maybe_update_ostatus(doc, user) do
old_data = %{
avatar: user.avatar,
bio: user.bio,
name: user.name,
info: user.info
}
with false <- user.local,
avatar <- make_avatar_object(doc),
bio <- string_from_xpath("//author[1]/summary", doc),
name <- string_from_xpath("//author[1]/poco:displayName", doc),
info <-
Map.put(user.info, "banner", make_avatar_object(doc, "header") || user.info["banner"]),
new_data <- %{
avatar: avatar || old_data.avatar,
name: name || old_data.name,
bio: bio || old_data.bio,
info: info || old_data.info
},
false <- new_data == old_data do
change = Ecto.Changeset.change(user, new_data)
Repo.update(change)
else
_ ->
{:ok, user}
end
end
def find_make_or_update_user(doc) do
uri = string_from_xpath("//author/uri[1]", doc)
with {:ok, user} <- find_or_make_user(uri) do
maybe_update(doc, user)
end
end
def find_or_make_user(uri) do
query = from(user in User, where: user.ap_id == ^uri)
user = Repo.one(query)
if is_nil(user) do
make_user(uri)
else
{:ok, user}
end
end
def make_user(uri, update \\ false) do
with {:ok, info} <- gather_user_info(uri) do
data = %{
name: info["name"],
nickname: info["nickname"] <> "@" <> info["host"],
ap_id: info["uri"],
info: info,
avatar: info["avatar"],
bio: info["bio"]
}
with false <- update,
%User{} = user <- User.get_by_ap_id(data.ap_id) do
{:ok, user}
else
_e -> User.insert_or_update_user(data)
end
end
end
# TODO: Just takes the first one for now.
def make_avatar_object(author_doc, rel \\ "avatar") do
href = string_from_xpath("//author[1]/link[@rel=\"#{rel}\"]/@href", author_doc)
type = string_from_xpath("//author[1]/link[@rel=\"#{rel}\"]/@type", author_doc)
if href do
%{
"type" => "Image",
"url" => [
%{
"type" => "Link",
"mediaType" => type,
"href" => href
}
]
}
else
nil
end
end
def gather_user_info(username) do
with {:ok, webfinger_data} <- WebFinger.finger(username),
{:ok, feed_data} <- Websub.gather_feed_data(webfinger_data["topic"]) do
{:ok, Map.merge(webfinger_data, feed_data) |> Map.put("fqn", username)}
else
e ->
Logger.debug(fn -> "Couldn't gather info for #{username}" end)
{:error, e}
end
end
# Regex-based 'parsing' so we don't have to pull in a full html parser
# It's a hack anyway. Maybe revisit this in the future
@mastodon_regex ~r/<link href='(.*)' rel='alternate' type='application\/atom\+xml'>/
@gs_regex ~r/<link title=.* href="(.*)" type="application\/atom\+xml" rel="alternate">/
@gs_classic_regex ~r/<link rel="alternate" href="(.*)" type="application\/atom\+xml" title=.*>/
def get_atom_url(body) do
cond do
Regex.match?(@mastodon_regex, body) ->
[[_, match]] = Regex.scan(@mastodon_regex, body)
{:ok, match}
Regex.match?(@gs_regex, body) ->
[[_, match]] = Regex.scan(@gs_regex, body)
{:ok, match}
Regex.match?(@gs_classic_regex, body) ->
[[_, match]] = Regex.scan(@gs_classic_regex, body)
{:ok, match}
true ->
Logger.debug(fn -> "Couldn't find Atom link in #{inspect(body)}" end)
{:error, "Couldn't find the Atom link"}
end
end
def fetch_activity_from_atom_url(url) do
with true <- String.starts_with?(url, "http"),
{:ok, %{body: body, status_code: code}} when code in 200..299 <-
@httpoison.get(
url,
[Accept: "application/atom+xml"],
follow_redirect: true,
timeout: 10000,
recv_timeout: 20000
) do
Logger.debug("Got document from #{url}, handling...")
handle_incoming(body)
else
e ->
Logger.debug("Couldn't get #{url}: #{inspect(e)}")
e
end
end
def fetch_activity_from_html_url(url) do
Logger.debug("Trying to fetch #{url}")
with true <- String.starts_with?(url, "http"),
{:ok, %{body: body}} <-
@httpoison.get(url, [], follow_redirect: true, timeout: 10000, recv_timeout: 20000),
{:ok, atom_url} <- get_atom_url(body) do
fetch_activity_from_atom_url(atom_url)
else
e ->
Logger.debug("Couldn't get #{url}: #{inspect(e)}")
e
end
end
def fetch_activity_from_url(url) do
try do
with {:ok, activities} when length(activities) > 0 <- fetch_activity_from_atom_url(url) do
{:ok, activities}
else
_e ->
with {:ok, activities} <- fetch_activity_from_html_url(url) do
{:ok, activities}
end
end
rescue
e ->
Logger.debug("Couldn't get #{url}: #{inspect(e)}")
{:error, "Couldn't get #{url}: #{inspect(e)}"}
end
end
end
diff --git a/lib/pleroma/web/ostatus/ostatus_controller.ex b/lib/pleroma/web/ostatus/ostatus_controller.ex
index 2f72fdb16..00bffbd5d 100644
--- a/lib/pleroma/web/ostatus/ostatus_controller.ex
+++ b/lib/pleroma/web/ostatus/ostatus_controller.ex
@@ -1,178 +1,178 @@
defmodule Pleroma.Web.OStatus.OStatusController do
use Pleroma.Web, :controller
alias Pleroma.{User, Activity}
alias Pleroma.Web.OStatus.{FeedRepresenter, ActivityRepresenter}
alias Pleroma.Repo
alias Pleroma.Web.{OStatus, Federator}
alias Pleroma.Web.XML
alias Pleroma.Web.ActivityPub.ActivityPubController
alias Pleroma.Web.ActivityPub.ActivityPub
action_fallback(:errors)
def feed_redirect(conn, %{"nickname" => nickname}) do
case get_format(conn) do
"html" ->
Fallback.RedirectController.redirector(conn, nil)
"activity+json" ->
ActivityPubController.call(conn, :user)
_ ->
with %User{} = user <- User.get_cached_by_nickname(nickname) do
redirect(conn, external: OStatus.feed_path(user))
else
nil -> {:error, :not_found}
end
end
end
def feed(conn, %{"nickname" => nickname} = params) do
with %User{} = user <- User.get_cached_by_nickname(nickname) do
query_params =
Map.take(params, ["max_id"])
|> Map.merge(%{"whole_db" => true, "actor_id" => user.ap_id})
activities =
ActivityPub.fetch_public_activities(query_params)
|> Enum.reverse()
response =
user
|> FeedRepresenter.to_simple_form(activities, [user])
|> :xmerl.export_simple(:xmerl_xml)
|> to_string
conn
|> put_resp_content_type("application/atom+xml")
|> send_resp(200, response)
else
nil -> {:error, :not_found}
end
end
defp decode_or_retry(body) do
with {:ok, magic_key} <- Pleroma.Web.Salmon.fetch_magic_key(body),
{:ok, doc} <- Pleroma.Web.Salmon.decode_and_validate(magic_key, body) do
{:ok, doc}
else
_e ->
with [decoded | _] <- Pleroma.Web.Salmon.decode(body),
doc <- XML.parse_document(decoded),
uri when not is_nil(uri) <- XML.string_from_xpath("/entry/author[1]/uri", doc),
{:ok, _} <- Pleroma.Web.OStatus.make_user(uri, true),
{:ok, magic_key} <- Pleroma.Web.Salmon.fetch_magic_key(body),
{:ok, doc} <- Pleroma.Web.Salmon.decode_and_validate(magic_key, body) do
{:ok, doc}
end
end
end
def salmon_incoming(conn, _) do
{:ok, body, _conn} = read_body(conn)
{:ok, doc} = decode_or_retry(body)
Federator.enqueue(:incoming_doc, doc)
conn
|> send_resp(200, "")
end
def object(conn, %{"uuid" => uuid}) do
if get_format(conn) == "activity+json" do
ActivityPubController.call(conn, :object)
else
with id <- o_status_url(conn, :object, uuid),
{_, %Activity{} = activity} <-
{:activity, Activity.get_create_activity_by_object_ap_id(id)},
{_, true} <- {:public?, ActivityPub.is_public?(activity)},
%User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
case get_format(conn) do
"html" -> redirect(conn, to: "/notice/#{activity.id}")
_ -> represent_activity(conn, activity, user)
end
else
{:public?, false} ->
{:error, :not_found}
{:activity, nil} ->
{:error, :not_found}
e ->
e
end
end
end
def activity(conn, %{"uuid" => uuid}) do
with id <- o_status_url(conn, :activity, uuid),
- {_, %Activity{} = activity} <- {:activity, Activity.get_by_ap_id(id)},
+ {_, %Activity{} = activity} <- {:activity, Activity.normalize(id)},
{_, true} <- {:public?, ActivityPub.is_public?(activity)},
%User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
case get_format(conn) do
"html" -> redirect(conn, to: "/notice/#{activity.id}")
_ -> represent_activity(conn, activity, user)
end
else
{:public?, false} ->
{:error, :not_found}
{:activity, nil} ->
{:error, :not_found}
e ->
e
end
end
def notice(conn, %{"id" => id}) do
with {_, %Activity{} = activity} <- {:activity, Repo.get(Activity, id)},
{_, true} <- {:public?, ActivityPub.is_public?(activity)},
%User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
case get_format(conn) do
"html" ->
conn
|> put_resp_content_type("text/html")
|> send_file(200, "priv/static/index.html")
_ ->
represent_activity(conn, activity, user)
end
else
{:public?, false} ->
{:error, :not_found}
{:activity, nil} ->
{:error, :not_found}
e ->
e
end
end
defp represent_activity(conn, activity, user) do
response =
activity
|> ActivityRepresenter.to_simple_form(user, true)
|> ActivityRepresenter.wrap_with_entry()
|> :xmerl.export_simple(:xmerl_xml)
|> to_string
conn
|> put_resp_content_type("application/atom+xml")
|> send_resp(200, response)
end
def errors(conn, {:error, :not_found}) do
conn
|> put_status(404)
|> text("Not found")
end
def errors(conn, _) do
conn
|> put_status(500)
|> text("Something went wrong")
end
end
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index ce38f3cc3..c61bad830 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -1,186 +1,186 @@
defmodule Pleroma.Web.Streamer do
use GenServer
require Logger
alias Pleroma.{User, Notification, Activity, Object}
def init(args) do
{:ok, args}
end
def start_link do
spawn(fn ->
# 30 seconds
Process.sleep(1000 * 30)
GenServer.cast(__MODULE__, %{action: :ping})
end)
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def add_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
end
def remove_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
end
def stream(topic, item) do
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
end
def handle_cast(%{action: :ping}, topics) do
Map.values(topics)
|> List.flatten()
|> Enum.each(fn socket ->
Logger.debug("Sending keepalive ping")
send(socket.transport_pid, {:text, ""})
end)
spawn(fn ->
# 30 seconds
Process.sleep(1000 * 30)
GenServer.cast(__MODULE__, %{action: :ping})
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
Enum.each(recipient_topics || [], fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(topics, user_topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
recipient_topics =
Pleroma.List.get_lists_from_activity(item)
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
Enum.each(recipient_topics || [], fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(topics, list_topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
topic = "user:#{item.user_id}"
Enum.each(topics[topic] || [], fn socket ->
json =
%{
event: "notification",
payload:
Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
socket.assigns["user"],
item
)
|> Jason.encode!()
}
|> Jason.encode!()
send(socket.transport_pid, {:text, json})
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
Logger.debug("Trying to push to users")
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
push_to_socket(topics, topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(topics, topic, item)
{:noreply, topics}
end
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = List.delete(sockets_for_topic, socket)
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Removed conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(m, state) do
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state}
end
defp represent_update(%Activity{} = activity, %User{} = user) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity,
for: user
)
|> Jason.encode!()
}
|> Jason.encode!()
end
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info["blocks"] || []
- parent = Object.get_by_ap_id(item.data["object"])
+ parent = Object.normalize(item.data["object"])
unless is_nil(parent) or item.actor in blocks or parent.data["actor"] in blocks do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
end)
end
def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info["blocks"] || []
unless item.actor in blocks do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
end)
end
defp internal_topic(topic, socket) when topic in ~w[user direct] do
"#{topic}:#{socket.assigns[:user].id}"
end
defp internal_topic(topic, _), do: topic
end

File Metadata

Mime Type
text/x-diff
Expires
Thu, Nov 28, 7:12 AM (1 d, 16 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
40957
Default Alt Text
(112 KB)

Event Timeline