Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F85197452
activity_pub.ex
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Size
54 KB
Referenced Files
None
Subscribers
None
activity_pub.ex
View Options
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule
Pleroma.Web.ActivityPub.ActivityPub
do
alias
Pleroma.Activity
alias
Pleroma.Activity.Ir.Topics
alias
Pleroma.Config
alias
Pleroma.Constants
alias
Pleroma.Conversation
alias
Pleroma.Conversation.Participation
alias
Pleroma.Filter
alias
Pleroma.Hashtag
alias
Pleroma.Maps
alias
Pleroma.Notification
alias
Pleroma.Object
alias
Pleroma.Object.Containment
alias
Pleroma.Object.Fetcher
alias
Pleroma.Pagination
alias
Pleroma.Repo
alias
Pleroma.Upload
alias
Pleroma.User
alias
Pleroma.Web.ActivityPub.MRF
alias
Pleroma.Web.ActivityPub.Transmogrifier
alias
Pleroma.Web.Streamer
alias
Pleroma.Web.WebFinger
alias
Pleroma.Workers.BackgroundWorker
alias
Pleroma.Workers.PollWorker
import
Ecto.Query
import
Pleroma.Web.ActivityPub.Utils
import
Pleroma.Web.ActivityPub.Visibility
require
Logger
require
Pleroma.Constants
@behaviour
Pleroma.Web.ActivityPub.ActivityPub.Persisting
@behaviour
Pleroma.Web.ActivityPub.ActivityPub.Streaming
defp
get_recipients
(%{
"type"
=>
"Create"
}
=
data
)
do
to
=
Map
.
get
(
data
,
"to"
,
[])
cc
=
Map
.
get
(
data
,
"cc"
,
[])
bcc
=
Map
.
get
(
data
,
"bcc"
,
[])
actor
=
Map
.
get
(
data
,
"actor"
,
[])
recipients
=
[
to
,
cc
,
bcc
,
[
actor
]]
|>
Enum
.
concat
()
|>
Enum
.
uniq
()
{
recipients
,
to
,
cc
}
end
defp
get_recipients
(
data
)
do
to
=
Map
.
get
(
data
,
"to"
,
[])
cc
=
Map
.
get
(
data
,
"cc"
,
[])
bcc
=
Map
.
get
(
data
,
"bcc"
,
[])
recipients
=
Enum
.
concat
([
to
,
cc
,
bcc
])
{
recipients
,
to
,
cc
}
end
defp
check_actor_can_insert
(%{
"type"
=>
"Delete"
}),
do
:
true
defp
check_actor_can_insert
(%{
"type"
=>
"Undo"
}),
do
:
true
defp
check_actor_can_insert
(%{
"actor"
=>
actor
})
when
is_binary
(
actor
)
do
case
User
.
get_cached_by_ap_id
(
actor
)
do
%
User
{
is_active
:
true
}
->
true
_
->
false
end
end
defp
check_actor_can_insert
(
_
),
do
:
true
defp
check_remote_limit
(%{
"object"
=>
%{
"content"
=>
content
}})
when
not
is_nil
(
content
)
do
limit
=
Config
.
get
([
:instance
,
:remote_limit
])
String
.
length
(
content
)
<=
limit
end
defp
check_remote_limit
(
_
),
do
:
true
def
increase_note_count_if_public
(
actor
,
object
)
do
if
public?
(
object
),
do
:
User
.
increase_note_count
(
actor
),
else
:
{
:ok
,
actor
}
end
def
decrease_note_count_if_public
(
actor
,
object
)
do
if
public?
(
object
),
do
:
User
.
decrease_note_count
(
actor
),
else
:
{
:ok
,
actor
}
end
def
update_last_status_at_if_public
(
actor
,
object
)
do
if
public?
(
object
),
do
:
User
.
update_last_status_at
(
actor
),
else
:
{
:ok
,
actor
}
end
defp
increase_replies_count_if_reply
(%{
"object"
=>
%{
"inReplyTo"
=>
reply_ap_id
}
=
object
,
"type"
=>
"Create"
})
do
if
public?
(
object
)
do
Object
.
increase_replies_count
(
reply_ap_id
)
end
end
defp
increase_replies_count_if_reply
(
_create_data
),
do
:
:noop
defp
increase_quotes_count_if_quote
(%{
"object"
=>
%{
"quoteUrl"
=>
quote_ap_id
}
=
object
,
"type"
=>
"Create"
})
do
if
public?
(
object
)
do
Object
.
increase_quotes_count
(
quote_ap_id
)
end
end
defp
increase_quotes_count_if_quote
(
_create_data
),
do
:
:noop
@object_types
~w[ChatMessage Question Answer Audio Video Image Event Article Note Page]
@impl
true
def
persist
(%{
"type"
=>
type
}
=
object
,
meta
)
when
type
in
@object_types
do
with
{
:ok
,
object
}
<-
Object
.
create
(
object
)
do
{
:ok
,
object
,
meta
}
end
end
@impl
true
def
persist
(
object
,
meta
)
do
with
local
<-
Keyword
.
fetch!
(
meta
,
:local
),
{
recipients
,
_
,
_
}
<-
get_recipients
(
object
),
{
:ok
,
activity
}
<-
Repo
.
insert
(%
Activity
{
data
:
object
,
local
:
local
,
recipients
:
recipients
,
actor
:
object
[
"actor"
]
}),
# TODO: add tests for expired activities, when Note type will be supported in new pipeline
{
:ok
,
_
}
<-
maybe_create_activity_expiration
(
activity
)
do
{
:ok
,
activity
,
meta
}
end
end
@spec
insert
(
map
(),
boolean
(),
boolean
(),
boolean
())
::
{
:ok
,
Activity
.
t
()}
|
{
:error
,
any
()}
def
insert
(
map
,
local
\\
true
,
fake
\\
false
,
bypass_actor_check
\\
false
)
when
is_map
(
map
)
do
with
nil
<-
Activity
.
normalize
(
map
),
map
<-
lazy_put_activity_defaults
(
map
,
fake
),
{
_
,
true
}
<-
{
:actor_check
,
bypass_actor_check
||
check_actor_can_insert
(
map
)},
{
_
,
true
}
<-
{
:remote_limit_pass
,
check_remote_limit
(
map
)},
{
:ok
,
map
}
<-
MRF
.
filter
(
map
),
{
recipients
,
_
,
_
}
=
get_recipients
(
map
),
{
:fake
,
false
,
map
,
recipients
}
<-
{
:fake
,
fake
,
map
,
recipients
},
{
:containment
,
:ok
}
<-
{
:containment
,
Containment
.
contain_child
(
map
)},
{
:ok
,
map
,
object
}
<-
insert_full_object
(
map
),
{
:ok
,
activity
}
<-
insert_activity_with_expiration
(
map
,
local
,
recipients
)
do
# Splice in the child object if we have one.
activity
=
Maps
.
put_if_present
(
activity
,
:object
,
object
)
ConcurrentLimiter
.
limit
(
Pleroma.Web.RichMedia.Helpers
,
fn
->
Task
.
start
(
fn
->
Pleroma.Web.RichMedia.Helpers
.
fetch_data_for_activity
(
activity
)
end
)
end
)
# Add local posts to search index
if
local
,
do
:
Pleroma.Search
.
add_to_index
(
activity
)
{
:ok
,
activity
}
else
%
Activity
{}
=
activity
->
{
:ok
,
activity
}
{
:actor_check
,
_
}
->
{
:error
,
false
}
{
:containment
,
_
}
=
error
->
error
{
:error
,
_
}
=
error
->
error
{
:fake
,
true
,
map
,
recipients
}
->
activity
=
%
Activity
{
data
:
map
,
local
:
local
,
actor
:
map
[
"actor"
],
recipients
:
recipients
,
id
:
"pleroma:fakeid"
}
Pleroma.Web.RichMedia.Helpers
.
fetch_data_for_activity
(
activity
)
{
:ok
,
activity
}
{
:remote_limit_pass
,
_
}
->
{
:error
,
:remote_limit
}
{
:reject
,
_
}
=
e
->
{
:error
,
e
}
end
end
defp
insert_activity_with_expiration
(
data
,
local
,
recipients
)
do
struct
=
%
Activity
{
data
:
data
,
local
:
local
,
actor
:
data
[
"actor"
],
recipients
:
recipients
}
with
{
:ok
,
activity
}
<-
Repo
.
insert
(
struct
)
do
maybe_create_activity_expiration
(
activity
)
end
end
def
notify_and_stream
(
activity
)
do
Notification
.
create_notifications
(
activity
)
original_activity
=
case
activity
do
%{
data
:
%{
"type"
=>
"Update"
},
object
:
%{
data
:
%{
"id"
=>
id
}}}
->
Activity
.
get_create_by_object_ap_id_with_object
(
id
)
_
->
activity
end
conversation
=
create_or_bump_conversation
(
original_activity
,
original_activity
.
actor
)
participations
=
get_participations
(
conversation
)
stream_out
(
activity
)
stream_out_participations
(
participations
)
end
defp
maybe_create_activity_expiration
(
%{
data
:
%{
"expires_at"
=>
%
DateTime
{}
=
expires_at
}}
=
activity
)
do
with
{
:ok
,
_job
}
<-
Pleroma.Workers.PurgeExpiredActivity
.
enqueue
(%{
activity_id
:
activity
.
id
,
expires_at
:
expires_at
})
do
{
:ok
,
activity
}
end
end
defp
maybe_create_activity_expiration
(
activity
),
do
:
{
:ok
,
activity
}
defp
create_or_bump_conversation
(
activity
,
actor
)
do
with
{
:ok
,
conversation
}
<-
Conversation
.
create_or_bump_for
(
activity
),
%
User
{}
=
user
<-
User
.
get_cached_by_ap_id
(
actor
)
do
Participation
.
mark_as_read
(
user
,
conversation
)
{
:ok
,
conversation
}
end
end
defp
get_participations
({
:ok
,
conversation
})
do
conversation
|>
Repo
.
preload
(
:participations
,
force
:
true
)
|>
Map
.
get
(
:participations
)
end
defp
get_participations
(
_
),
do
:
[]
def
stream_out_participations
(
participations
)
do
participations
=
participations
|>
Repo
.
preload
(
:user
)
Streamer
.
stream
(
"participation"
,
participations
)
end
@impl
true
def
stream_out_participations
(%
Object
{
data
:
%{
"context"
=>
context
}},
user
)
do
with
%
Conversation
{}
=
conversation
<-
Conversation
.
get_for_ap_id
(
context
)
do
conversation
=
Repo
.
preload
(
conversation
,
:participations
)
last_activity_id
=
fetch_latest_direct_activity_id_for_context
(
conversation
.
ap_id
,
%{
user
:
user
,
blocking_user
:
user
})
if
last_activity_id
do
stream_out_participations
(
conversation
.
participations
)
end
end
end
@impl
true
def
stream_out_participations
(
_
,
_
),
do
:
:noop
@impl
true
def
stream_out
(%
Activity
{
data
:
%{
"type"
=>
data_type
}}
=
activity
)
when
data_type
in
[
"Create"
,
"Announce"
,
"Delete"
,
"Update"
]
do
activity
|>
Topics
.
get_activity_topics
()
|>
Streamer
.
stream
(
activity
)
end
@impl
true
def
stream_out
(
_activity
)
do
:noop
end
@spec
create
(
map
(),
boolean
())
::
{
:ok
,
Activity
.
t
()}
|
{
:error
,
any
()}
def
create
(
params
,
fake
\\
false
)
do
with
{
:ok
,
result
}
<-
Repo
.
transaction
(
fn
->
do_create
(
params
,
fake
)
end
)
do
result
end
end
defp
do_create
(%{
to
:
to
,
actor
:
actor
,
context
:
context
,
object
:
object
}
=
params
,
fake
)
do
additional
=
params
[
:additional
]
||
%{}
# only accept false as false value
local
=
!
(
params
[
:local
]
==
false
)
published
=
params
[
:published
]
quick_insert?
=
Config
.
get
([
:env
])
==
:benchmark
create_data
=
make_create_data
(
%{
to
:
to
,
actor
:
actor
,
published
:
published
,
context
:
context
,
object
:
object
},
additional
)
with
{
:ok
,
activity
}
<-
insert
(
create_data
,
local
,
fake
),
{
:fake
,
false
,
activity
}
<-
{
:fake
,
fake
,
activity
},
_
<-
increase_replies_count_if_reply
(
create_data
),
_
<-
increase_quotes_count_if_quote
(
create_data
),
{
:quick_insert
,
false
,
activity
}
<-
{
:quick_insert
,
quick_insert?
,
activity
},
{
:ok
,
_actor
}
<-
increase_note_count_if_public
(
actor
,
activity
),
{
:ok
,
_actor
}
<-
update_last_status_at_if_public
(
actor
,
activity
),
_
<-
notify_and_stream
(
activity
),
:ok
<-
maybe_schedule_poll_notifications
(
activity
),
:ok
<-
maybe_handle_group_posts
(
activity
),
:ok
<-
maybe_federate
(
activity
)
do
{
:ok
,
activity
}
else
{
:quick_insert
,
true
,
activity
}
->
{
:ok
,
activity
}
{
:fake
,
true
,
activity
}
->
{
:ok
,
activity
}
{
:error
,
message
}
->
Repo
.
rollback
(
message
)
end
end
defp
maybe_schedule_poll_notifications
(
activity
)
do
PollWorker
.
schedule_poll_end
(
activity
)
:ok
end
@spec
listen
(
map
())
::
{
:ok
,
Activity
.
t
()}
|
{
:error
,
any
()}
def
listen
(%{
to
:
to
,
actor
:
actor
,
context
:
context
,
object
:
object
}
=
params
)
do
additional
=
params
[
:additional
]
||
%{}
# only accept false as false value
local
=
!
(
params
[
:local
]
==
false
)
published
=
params
[
:published
]
listen_data
=
make_listen_data
(
%{
to
:
to
,
actor
:
actor
,
published
:
published
,
context
:
context
,
object
:
object
},
additional
)
with
{
:ok
,
activity
}
<-
insert
(
listen_data
,
local
),
_
<-
notify_and_stream
(
activity
),
:ok
<-
maybe_federate
(
activity
)
do
{
:ok
,
activity
}
end
end
@spec
unfollow
(
User
.
t
(),
User
.
t
(),
String
.
t
()
|
nil
,
boolean
())
::
{
:ok
,
Activity
.
t
()}
|
nil
|
{
:error
,
any
()}
def
unfollow
(
follower
,
followed
,
activity_id
\\
nil
,
local
\\
true
)
do
with
{
:ok
,
result
}
<-
Repo
.
transaction
(
fn
->
do_unfollow
(
follower
,
followed
,
activity_id
,
local
)
end
)
do
result
end
end
defp
do_unfollow
(
follower
,
followed
,
activity_id
,
local
)
do
with
%
Activity
{}
=
follow_activity
<-
fetch_latest_follow
(
follower
,
followed
),
{
:ok
,
follow_activity
}
<-
update_follow_state
(
follow_activity
,
"cancelled"
),
unfollow_data
<-
make_unfollow_data
(
follower
,
followed
,
follow_activity
,
activity_id
),
{
:ok
,
activity
}
<-
insert
(
unfollow_data
,
local
),
_
<-
notify_and_stream
(
activity
),
:ok
<-
maybe_federate
(
activity
)
do
{
:ok
,
activity
}
else
nil
->
nil
{
:error
,
error
}
->
Repo
.
rollback
(
error
)
end
end
@spec
flag
(
map
())
::
{
:ok
,
Activity
.
t
()}
|
{
:error
,
any
()}
def
flag
(
params
)
do
with
{
:ok
,
result
}
<-
Repo
.
transaction
(
fn
->
do_flag
(
params
)
end
)
do
result
end
end
defp
do_flag
(
%{
actor
:
actor
,
context
:
_context
,
account
:
account
,
statuses
:
statuses
,
content
:
content
}
=
params
)
do
# only accept false as false value
local
=
!
(
params
[
:local
]
==
false
)
forward
=
!
(
params
[
:forward
]
==
false
)
additional
=
params
[
:additional
]
||
%{}
additional
=
if
forward
do
Map
.
merge
(
additional
,
%{
"to"
=>
[],
"cc"
=>
[
account
.
ap_id
]})
else
Map
.
merge
(
additional
,
%{
"to"
=>
[],
"cc"
=>
[]})
end
with
flag_data
<-
make_flag_data
(
params
,
additional
),
{
:ok
,
activity
}
<-
insert
(
flag_data
,
local
),
{
:ok
,
stripped_activity
}
<-
strip_report_status_data
(
activity
),
_
<-
notify_and_stream
(
activity
),
:ok
<-
maybe_federate
(
stripped_activity
)
do
User
.
all_users_with_privilege
(
:reports_manage_reports
)
|>
Enum
.
filter
(
fn
user
->
user
.
ap_id
!=
actor
end
)
|>
Enum
.
filter
(
fn
user
->
not
is_nil
(
user
.
email
)
end
)
|>
Enum
.
each
(
fn
privileged_user
->
privileged_user
|>
Pleroma.Emails.AdminEmail
.
report
(
actor
,
account
,
statuses
,
content
)
|>
Pleroma.Emails.Mailer
.
deliver_async
()
end
)
{
:ok
,
activity
}
else
{
:error
,
error
}
->
Repo
.
rollback
(
error
)
end
end
@spec
move
(
User
.
t
(),
User
.
t
(),
boolean
())
::
{
:ok
,
Activity
.
t
()}
|
{
:error
,
any
()}
def
move
(%
User
{}
=
origin
,
%
User
{}
=
target
,
local
\\
true
)
do
params
=
%{
"type"
=>
"Move"
,
"actor"
=>
origin
.
ap_id
,
"object"
=>
origin
.
ap_id
,
"target"
=>
target
.
ap_id
,
"to"
=>
[
origin
.
follower_address
]
}
with
true
<-
origin
.
ap_id
in
target
.
also_known_as
,
{
:ok
,
activity
}
<-
insert
(
params
,
local
),
_
<-
notify_and_stream
(
activity
)
do
maybe_federate
(
activity
)
BackgroundWorker
.
enqueue
(
"move_following"
,
%{
"origin_id"
=>
origin
.
id
,
"target_id"
=>
target
.
id
})
{
:ok
,
activity
}
else
false
->
{
:error
,
"Target account must have the origin in `alsoKnownAs`"
}
err
->
err
end
end
def
fetch_activities_for_context_query
(
context
,
opts
)
do
public
=
[
Constants
.
as_public
()]
recipients
=
if
opts
[
:user
],
do
:
[
opts
[
:user
]
.
ap_id
|
User
.
following
(
opts
[
:user
])]
++
public
,
else
:
public
from
(
activity
in
Activity
)
|>
maybe_preload_objects
(
opts
)
|>
maybe_preload_bookmarks
(
opts
)
|>
maybe_set_thread_muted_field
(
opts
)
|>
restrict_unauthenticated
(
opts
[
:user
])
|>
restrict_blocked
(
opts
)
|>
restrict_blockers_visibility
(
opts
)
|>
restrict_recipients
(
recipients
,
opts
[
:user
])
|>
restrict_filtered
(
opts
)
|>
where
(
[
activity
],
fragment
(
"?->>'type' = ? and ?->>'context' = ?"
,
activity
.
data
,
"Create"
,
activity
.
data
,
^
context
)
)
|>
exclude_poll_votes
(
opts
)
|>
exclude_id
(
opts
)
|>
order_by
([
activity
],
desc
:
activity
.
id
)
end
@spec
fetch_activities_for_context
(
String
.
t
(),
keyword
()
|
map
())
::
[
Activity
.
t
()]
def
fetch_activities_for_context
(
context
,
opts
\\
%{})
do
context
|>
fetch_activities_for_context_query
(
opts
)
|>
Repo
.
all
()
end
@spec
fetch_latest_direct_activity_id_for_context
(
String
.
t
(),
keyword
()
|
map
())
::
Ecto.UUID
.
t
()
|
nil
def
fetch_latest_direct_activity_id_for_context
(
context
,
opts
\\
%{})
do
context
|>
fetch_activities_for_context_query
(
Map
.
merge
(%{
skip_preload
:
true
},
opts
))
|>
restrict_visibility
(%{
visibility
:
"direct"
})
|>
limit
(
1
)
|>
select
([
a
],
a
.
id
)
|>
Repo
.
one
()
end
defp
fetch_paginated_optimized
(
query
,
opts
,
pagination
)
do
# Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
# and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
opts
=
Map
.
put
(
opts
,
:skip_extra_order
,
true
)
Pagination
.
fetch_paginated
(
query
,
opts
,
pagination
)
end
def
fetch_activities
(
recipients
,
opts
\\
%{},
pagination
\\
:keyset
)
do
list_memberships
=
Pleroma.List
.
memberships
(
opts
[
:user
])
fetch_activities_query
(
recipients
++
list_memberships
,
opts
)
|>
fetch_paginated_optimized
(
opts
,
pagination
)
|>
Enum
.
reverse
()
|>
maybe_update_cc
(
list_memberships
,
opts
[
:user
])
end
@spec
fetch_public_or_unlisted_activities
(
map
(),
Pagination
.
type
())
::
[
Activity
.
t
()]
def
fetch_public_or_unlisted_activities
(
opts
\\
%{},
pagination
\\
:keyset
)
do
includes_local_public
=
Map
.
get
(
opts
,
:includes_local_public
,
false
)
opts
=
Map
.
delete
(
opts
,
:user
)
intended_recipients
=
if
includes_local_public
do
[
Constants
.
as_public
(),
as_local_public
()]
else
[
Constants
.
as_public
()]
end
intended_recipients
|>
fetch_activities_query
(
opts
)
|>
restrict_unlisted
(
opts
)
|>
fetch_paginated_optimized
(
opts
,
pagination
)
end
@spec
fetch_public_activities
(
map
(),
Pagination
.
type
())
::
[
Activity
.
t
()]
def
fetch_public_activities
(
opts
\\
%{},
pagination
\\
:keyset
)
do
opts
|>
Map
.
put
(
:restrict_unlisted
,
true
)
|>
fetch_public_or_unlisted_activities
(
pagination
)
end
@valid_visibilities
~w[direct unlisted public private]
defp
restrict_visibility
(
query
,
%{
visibility
:
visibility
})
when
is_list
(
visibility
)
do
if
Enum
.
all?
(
visibility
,
&
(
&1
in
@valid_visibilities
))
do
from
(
a
in
query
,
where
:
fragment
(
"activity_visibility(?, ?, ?) = ANY (?)"
,
a
.
actor
,
a
.
recipients
,
a
.
data
,
^
visibility
)
)
else
Logger
.
error
(
"Could not restrict visibility to
#{
visibility
}
"
)
end
end
defp
restrict_visibility
(
query
,
%{
visibility
:
visibility
})
when
visibility
in
@valid_visibilities
do
from
(
a
in
query
,
where
:
fragment
(
"activity_visibility(?, ?, ?) = ?"
,
a
.
actor
,
a
.
recipients
,
a
.
data
,
^
visibility
)
)
end
defp
restrict_visibility
(
_query
,
%{
visibility
:
visibility
})
when
visibility
not
in
@valid_visibilities
do
Logger
.
error
(
"Could not restrict visibility to
#{
visibility
}
"
)
end
defp
restrict_visibility
(
query
,
_visibility
),
do
:
query
defp
exclude_visibility
(
query
,
%{
exclude_visibilities
:
visibility
})
when
is_list
(
visibility
)
do
if
Enum
.
all?
(
visibility
,
&
(
&1
in
@valid_visibilities
))
do
from
(
a
in
query
,
where
:
not
fragment
(
"activity_visibility(?, ?, ?) = ANY (?)"
,
a
.
actor
,
a
.
recipients
,
a
.
data
,
^
visibility
)
)
else
Logger
.
error
(
"Could not exclude visibility to
#{
visibility
}
"
)
query
end
end
defp
exclude_visibility
(
query
,
%{
exclude_visibilities
:
visibility
})
when
visibility
in
@valid_visibilities
do
from
(
a
in
query
,
where
:
not
fragment
(
"activity_visibility(?, ?, ?) = ?"
,
a
.
actor
,
a
.
recipients
,
a
.
data
,
^
visibility
)
)
end
defp
exclude_visibility
(
query
,
%{
exclude_visibilities
:
visibility
})
when
visibility
not
in
[
nil
|
@valid_visibilities
]
do
Logger
.
error
(
"Could not exclude visibility to
#{
visibility
}
"
)
query
end
defp
exclude_visibility
(
query
,
_visibility
),
do
:
query
defp
restrict_thread_visibility
(
query
,
_
,
%{
skip_thread_containment
:
true
}
=
_
),
do
:
query
defp
restrict_thread_visibility
(
query
,
%{
user
:
%
User
{
skip_thread_containment
:
true
}},
_
),
do
:
query
defp
restrict_thread_visibility
(
query
,
%{
user
:
%
User
{
ap_id
:
ap_id
}},
_
)
do
local_public
=
as_local_public
()
from
(
a
in
query
,
where
:
fragment
(
"thread_visibility(?, (?)->>'id', ?) = true"
,
^
ap_id
,
a
.
data
,
^
local_public
)
)
end
defp
restrict_thread_visibility
(
query
,
_
,
_
),
do
:
query
def
fetch_user_abstract_activities
(
user
,
reading_user
,
params
\\
%{})
do
params
=
params
|>
Map
.
put
(
:user
,
reading_user
)
|>
Map
.
put
(
:actor_id
,
user
.
ap_id
)
%{
godmode
:
params
[
:godmode
],
reading_user
:
reading_user
}
|>
user_activities_recipients
()
|>
fetch_activities
(
params
)
|>
Enum
.
reverse
()
end
def
fetch_user_activities
(
user
,
reading_user
,
params
\\
%{})
def
fetch_user_activities
(
user
,
reading_user
,
%{
total
:
true
}
=
params
)
do
result
=
fetch_activities_for_user
(
user
,
reading_user
,
params
)
Keyword
.
put
(
result
,
:items
,
Enum
.
reverse
(
result
[
:items
]))
end
def
fetch_user_activities
(
user
,
reading_user
,
params
)
do
user
|>
fetch_activities_for_user
(
reading_user
,
params
)
|>
Enum
.
reverse
()
end
defp
fetch_activities_for_user
(
user
,
reading_user
,
params
)
do
params
=
params
|>
Map
.
put
(
:type
,
[
"Create"
,
"Announce"
])
|>
Map
.
put
(
:user
,
reading_user
)
|>
Map
.
put
(
:actor_id
,
user
.
ap_id
)
|>
Map
.
put
(
:pinned_object_ids
,
Map
.
keys
(
user
.
pinned_objects
))
params
=
if
User
.
blocks?
(
reading_user
,
user
)
do
params
else
params
|>
Map
.
put
(
:blocking_user
,
reading_user
)
|>
Map
.
put
(
:muting_user
,
reading_user
)
end
pagination_type
=
Map
.
get
(
params
,
:pagination_type
)
||
:keyset
%{
godmode
:
params
[
:godmode
],
reading_user
:
reading_user
}
|>
user_activities_recipients
()
|>
fetch_activities
(
params
,
pagination_type
)
end
def
fetch_statuses
(
reading_user
,
%{
total
:
true
}
=
params
)
do
result
=
fetch_activities_for_reading_user
(
reading_user
,
params
)
Keyword
.
put
(
result
,
:items
,
Enum
.
reverse
(
result
[
:items
]))
end
def
fetch_statuses
(
reading_user
,
params
)
do
reading_user
|>
fetch_activities_for_reading_user
(
params
)
|>
Enum
.
reverse
()
end
defp
fetch_activities_for_reading_user
(
reading_user
,
params
)
do
params
=
Map
.
put
(
params
,
:type
,
[
"Create"
,
"Announce"
])
%{
godmode
:
params
[
:godmode
],
reading_user
:
reading_user
}
|>
user_activities_recipients
()
|>
fetch_activities
(
params
,
:offset
)
end
defp
user_activities_recipients
(%{
godmode
:
true
}),
do
:
[]
defp
user_activities_recipients
(%{
reading_user
:
reading_user
})
do
if
not
is_nil
(
reading_user
)
and
reading_user
.
local
do
[
Constants
.
as_public
(),
as_local_public
(),
reading_user
.
ap_id
|
User
.
following
(
reading_user
)
]
else
[
Constants
.
as_public
()]
end
end
defp
restrict_announce_object_actor
(
_query
,
%{
announce_filtering_user
:
_
,
skip_preload
:
true
})
do
raise
"Can't use the child object without preloading!"
end
defp
restrict_announce_object_actor
(
query
,
%{
announce_filtering_user
:
%{
ap_id
:
actor
}})
do
from
(
[
activity
,
object
]
in
query
,
where
:
fragment
(
"?->>'type' != ? or ?->>'actor' != ?"
,
activity
.
data
,
"Announce"
,
object
.
data
,
^
actor
)
)
end
defp
restrict_announce_object_actor
(
query
,
_
),
do
:
query
defp
restrict_since
(
query
,
%{
since_id
:
""
}),
do
:
query
defp
restrict_since
(
query
,
%{
since_id
:
since_id
})
do
from
(
activity
in
query
,
where
:
activity
.
id
>
^
since_id
)
end
defp
restrict_since
(
query
,
_
),
do
:
query
defp
restrict_embedded_tag_all
(
_query
,
%{
tag_all
:
_tag_all
,
skip_preload
:
true
})
do
raise_on_missing_preload
()
end
defp
restrict_embedded_tag_all
(
query
,
%{
tag_all
:
[
_
|
_
]
=
tag_all
})
do
from
(
[
_activity
,
object
]
in
query
,
where
:
fragment
(
"(?)->'tag' \\?& (?)"
,
object
.
data
,
^
tag_all
)
)
end
defp
restrict_embedded_tag_all
(
query
,
%{
tag_all
:
tag
})
when
is_binary
(
tag
)
do
restrict_embedded_tag_any
(
query
,
%{
tag
:
tag
})
end
defp
restrict_embedded_tag_all
(
query
,
_
),
do
:
query
defp
restrict_embedded_tag_any
(
_query
,
%{
tag
:
_tag
,
skip_preload
:
true
})
do
raise_on_missing_preload
()
end
defp
restrict_embedded_tag_any
(
query
,
%{
tag
:
[
_
|
_
]
=
tag_any
})
do
from
(
[
_activity
,
object
]
in
query
,
where
:
fragment
(
"(?)->'tag' \\?| (?)"
,
object
.
data
,
^
tag_any
)
)
end
defp
restrict_embedded_tag_any
(
query
,
%{
tag
:
tag
})
when
is_binary
(
tag
)
do
restrict_embedded_tag_any
(
query
,
%{
tag
:
[
tag
]})
end
defp
restrict_embedded_tag_any
(
query
,
_
),
do
:
query
defp
restrict_embedded_tag_reject_any
(
_query
,
%{
tag_reject
:
_tag_reject
,
skip_preload
:
true
})
do
raise_on_missing_preload
()
end
defp
restrict_embedded_tag_reject_any
(
query
,
%{
tag_reject
:
[
_
|
_
]
=
tag_reject
})
do
from
(
[
_activity
,
object
]
in
query
,
where
:
fragment
(
"not (?)->'tag' \\?| (?)"
,
object
.
data
,
^
tag_reject
)
)
end
defp
restrict_embedded_tag_reject_any
(
query
,
%{
tag_reject
:
tag_reject
})
when
is_binary
(
tag_reject
)
do
restrict_embedded_tag_reject_any
(
query
,
%{
tag_reject
:
[
tag_reject
]})
end
defp
restrict_embedded_tag_reject_any
(
query
,
_
),
do
:
query
defp
object_ids_query_for_tags
(
tags
)
do
from
(
hto
in
"hashtags_objects"
)
|>
join
(
:inner
,
[
hto
],
ht
in
Pleroma.Hashtag
,
on
:
hto
.
hashtag_id
==
ht
.
id
)
|>
where
([
hto
,
ht
],
ht
.
name
in
^
tags
)
|>
select
([
hto
],
hto
.
object_id
)
|>
distinct
([
hto
],
true
)
end
defp
restrict_hashtag_all
(
_query
,
%{
tag_all
:
_tag
,
skip_preload
:
true
})
do
raise_on_missing_preload
()
end
defp
restrict_hashtag_all
(
query
,
%{
tag_all
:
[
single_tag
]})
do
restrict_hashtag_any
(
query
,
%{
tag
:
single_tag
})
end
defp
restrict_hashtag_all
(
query
,
%{
tag_all
:
[
_
|
_
]
=
tags
})
do
from
(
[
_activity
,
object
]
in
query
,
where
:
fragment
(
"""
(SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
AND hashtags_objects.object_id = ?) @> ?
"""
,
^
tags
,
object
.
id
,
^
tags
)
)
end
defp
restrict_hashtag_all
(
query
,
%{
tag_all
:
tag
})
when
is_binary
(
tag
)
do
restrict_hashtag_all
(
query
,
%{
tag_all
:
[
tag
]})
end
defp
restrict_hashtag_all
(
query
,
_
),
do
:
query
defp
restrict_hashtag_any
(
_query
,
%{
tag
:
_tag
,
skip_preload
:
true
})
do
raise_on_missing_preload
()
end
defp
restrict_hashtag_any
(
query
,
%{
tag
:
[
_
|
_
]
=
tags
})
do
hashtag_ids
=
from
(
ht
in
Hashtag
,
where
:
ht
.
name
in
^
tags
,
select
:
ht
.
id
)
|>
Repo
.
all
()
# Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
from
(
[
_activity
,
object
]
in
query
,
join
:
hto
in
"hashtags_objects"
,
on
:
hto
.
object_id
==
object
.
id
,
where
:
hto
.
hashtag_id
in
^
hashtag_ids
,
distinct
:
[
desc
:
object
.
id
],
order_by
:
[
desc
:
object
.
id
]
)
end
defp
restrict_hashtag_any
(
query
,
%{
tag
:
tag
})
when
is_binary
(
tag
)
do
restrict_hashtag_any
(
query
,
%{
tag
:
[
tag
]})
end
defp
restrict_hashtag_any
(
query
,
_
),
do
:
query
defp
restrict_hashtag_reject_any
(
_query
,
%{
tag_reject
:
_tag_reject
,
skip_preload
:
true
})
do
raise_on_missing_preload
()
end
defp
restrict_hashtag_reject_any
(
query
,
%{
tag_reject
:
[
_
|
_
]
=
tags_reject
})
do
from
(
[
_activity
,
object
]
in
query
,
where
:
object
.
id
not
in
subquery
(
object_ids_query_for_tags
(
tags_reject
))
)
end
defp
restrict_hashtag_reject_any
(
query
,
%{
tag_reject
:
tag_reject
})
when
is_binary
(
tag_reject
)
do
restrict_hashtag_reject_any
(
query
,
%{
tag_reject
:
[
tag_reject
]})
end
defp
restrict_hashtag_reject_any
(
query
,
_
),
do
:
query
defp
raise_on_missing_preload
do
raise
"Can't use the child object without preloading!"
end
defp
restrict_recipients
(
query
,
[],
_user
),
do
:
query
defp
restrict_recipients
(
query
,
recipients
,
nil
)
do
from
(
activity
in
query
,
where
:
fragment
(
"? && ?"
,
^
recipients
,
activity
.
recipients
))
end
defp
restrict_recipients
(
query
,
recipients
,
user
)
do
from
(
activity
in
query
,
where
:
fragment
(
"? && ?"
,
^
recipients
,
activity
.
recipients
),
or_where
:
activity
.
actor
==
^
user
.
ap_id
)
end
defp
restrict_local
(
query
,
%{
local_only
:
true
})
do
from
(
activity
in
query
,
where
:
activity
.
local
==
true
)
end
defp
restrict_local
(
query
,
_
),
do
:
query
defp
restrict_remote
(
query
,
%{
remote
:
true
})
do
from
(
activity
in
query
,
where
:
activity
.
local
==
false
)
end
defp
restrict_remote
(
query
,
_
),
do
:
query
defp
restrict_actor
(
query
,
%{
actor_id
:
actor_id
})
do
from
(
activity
in
query
,
where
:
activity
.
actor
==
^
actor_id
)
end
defp
restrict_actor
(
query
,
_
),
do
:
query
defp
restrict_type
(
query
,
%{
type
:
type
})
when
is_binary
(
type
)
do
from
(
activity
in
query
,
where
:
fragment
(
"?->>'type' = ?"
,
activity
.
data
,
^
type
))
end
defp
restrict_type
(
query
,
%{
type
:
type
})
do
from
(
activity
in
query
,
where
:
fragment
(
"?->>'type' = ANY(?)"
,
activity
.
data
,
^
type
))
end
defp
restrict_type
(
query
,
_
),
do
:
query
defp
restrict_state
(
query
,
%{
state
:
state
})
do
from
(
activity
in
query
,
where
:
fragment
(
"?->>'state' = ?"
,
activity
.
data
,
^
state
))
end
defp
restrict_state
(
query
,
_
),
do
:
query
defp
restrict_favorited_by
(
query
,
%{
favorited_by
:
ap_id
})
do
from
(
[
_activity
,
object
]
in
query
,
where
:
fragment
(
"(?)->'likes' \\? (?)"
,
object
.
data
,
^
ap_id
)
)
end
defp
restrict_favorited_by
(
query
,
_
),
do
:
query
defp
restrict_media
(
_query
,
%{
only_media
:
_val
,
skip_preload
:
true
})
do
raise
"Can't use the child object without preloading!"
end
defp
restrict_media
(
query
,
%{
only_media
:
true
})
do
from
(
[
activity
,
object
]
in
query
,
where
:
fragment
(
"(?)->>'type' = ?"
,
activity
.
data
,
"Create"
),
where
:
fragment
(
"not (?)->'attachment' = (?)"
,
object
.
data
,
^
[])
)
end
defp
restrict_media
(
query
,
_
),
do
:
query
defp
restrict_replies
(
query
,
%{
exclude_replies
:
true
})
do
from
(
[
_activity
,
object
]
in
query
,
where
:
fragment
(
"?->>'inReplyTo' is null"
,
object
.
data
)
)
end
defp
restrict_replies
(
query
,
%{
reply_filtering_user
:
%
User
{}
=
user
,
reply_visibility
:
"self"
})
do
from
(
[
activity
,
object
]
in
query
,
where
:
fragment
(
"?->>'inReplyTo' is null OR ? = ANY(?)"
,
object
.
data
,
^
user
.
ap_id
,
activity
.
recipients
)
)
end
defp
restrict_replies
(
query
,
%{
reply_filtering_user
:
%
User
{}
=
user
,
reply_visibility
:
"following"
})
do
from
(
[
activity
,
object
]
in
query
,
where
:
fragment
(
"""
?->>'type' != 'Create' -- This isn't a Create
OR ?->>'inReplyTo' is null -- this isn't a reply
OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
-- unless they are the author (because authors
-- are also part of the recipients). This leads
-- to a bug that self-replies by friends won't
-- show up.
OR ? = ? -- The actor is us
"""
,
activity
.
data
,
object
.
data
,
^
[
user
.
ap_id
|
User
.
get_cached_user_friends_ap_ids
(
user
)],
activity
.
recipients
,
activity
.
actor
,
activity
.
actor
,
^
user
.
ap_id
)
)
end
defp
restrict_replies
(
query
,
_
),
do
:
query
defp
restrict_reblogs
(
query
,
%{
exclude_reblogs
:
true
})
do
from
(
activity
in
query
,
where
:
fragment
(
"?->>'type' != 'Announce'"
,
activity
.
data
))
end
defp
restrict_reblogs
(
query
,
_
),
do
:
query
defp
restrict_muted
(
query
,
%{
with_muted
:
true
}),
do
:
query
defp
restrict_muted
(
query
,
%{
muting_user
:
%
User
{}
=
user
}
=
opts
)
do
mutes
=
opts
[
:muted_users_ap_ids
]
||
User
.
muted_users_ap_ids
(
user
)
query
=
from
([
activity
]
in
query
,
where
:
fragment
(
"not (? = ANY(?))"
,
activity
.
actor
,
^
mutes
),
where
:
fragment
(
"not (?->'to' \\?| ?) or ? = ?"
,
activity
.
data
,
^
mutes
,
activity
.
actor
,
^
user
.
ap_id
)
)
unless
opts
[
:skip_preload
]
do
from
([
thread_mute
:
tm
]
in
query
,
where
:
is_nil
(
tm
.
user_id
))
else
query
end
end
defp
restrict_muted
(
query
,
_
),
do
:
query
defp
restrict_blocked
(
query
,
%{
blocking_user
:
%
User
{}
=
user
}
=
opts
)
do
blocked_ap_ids
=
opts
[
:blocked_users_ap_ids
]
||
User
.
blocked_users_ap_ids
(
user
)
domain_blocks
=
user
.
domain_blocks
||
[]
following_ap_ids
=
User
.
get_friends_ap_ids
(
user
)
query
=
if
has_named_binding?
(
query
,
:object
),
do
:
query
,
else
:
Activity
.
with_joined_object
(
query
)
from
(
[
activity
,
object
:
o
]
in
query
,
# You don't block the author
where
:
fragment
(
"not (? = ANY(?))"
,
activity
.
actor
,
^
blocked_ap_ids
),
# You don't block any recipients, and didn't author the post
where
:
fragment
(
"((not (? && ?)) or ? = ?)"
,
activity
.
recipients
,
^
blocked_ap_ids
,
activity
.
actor
,
^
user
.
ap_id
),
# You don't block the domain of any recipients, and didn't author the post
where
:
fragment
(
"(recipients_contain_blocked_domains(?, ?) = false) or ? = ?"
,
activity
.
recipients
,
^
domain_blocks
,
activity
.
actor
,
^
user
.
ap_id
),
# It's not a boost of a user you block
where
:
fragment
(
"not (?->>'type' = 'Announce' and ?->'to' \\?| ?)"
,
activity
.
data
,
activity
.
data
,
^
blocked_ap_ids
),
# You don't block the author's domain, and also don't follow the author
where
:
fragment
(
"(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)"
,
activity
.
actor
,
^
domain_blocks
,
activity
.
actor
,
^
following_ap_ids
),
# Same as above, but checks the Object
where
:
fragment
(
"(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)"
,
o
.
data
,
^
domain_blocks
,
o
.
data
,
^
following_ap_ids
)
)
end
defp
restrict_blocked
(
query
,
_
),
do
:
query
defp
restrict_blockers_visibility
(
query
,
%{
blocking_user
:
%
User
{}
=
user
})
do
if
Config
.
get
([
:activitypub
,
:blockers_visible
])
==
true
do
query
else
blocker_ap_ids
=
User
.
incoming_relationships_ungrouped_ap_ids
(
user
,
[
:block
])
from
(
activity
in
query
,
# The author doesn't block you
where
:
fragment
(
"not (? = ANY(?))"
,
activity
.
actor
,
^
blocker_ap_ids
),
# It's not a boost of a user that blocks you
where
:
fragment
(
"not (?->>'type' = 'Announce' and ?->'to' \\?| ?)"
,
activity
.
data
,
activity
.
data
,
^
blocker_ap_ids
)
)
end
end
defp
restrict_blockers_visibility
(
query
,
_
),
do
:
query
defp
restrict_unlisted
(
query
,
%{
restrict_unlisted
:
true
})
do
from
(
activity
in
query
,
where
:
fragment
(
"not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)"
,
activity
.
data
,
^
[
Constants
.
as_public
()]
)
)
end
defp
restrict_unlisted
(
query
,
_
),
do
:
query
defp
restrict_pinned
(
query
,
%{
pinned
:
true
,
pinned_object_ids
:
ids
})
do
from
(
[
activity
,
object
:
o
]
in
query
,
where
:
fragment
(
"(?)->>'type' = 'Create' and associated_object_id((?)) = any (?)"
,
activity
.
data
,
activity
.
data
,
^
ids
)
)
end
defp
restrict_pinned
(
query
,
_
),
do
:
query
defp
restrict_muted_reblogs
(
query
,
%{
muting_user
:
%
User
{}
=
user
}
=
opts
)
do
muted_reblogs
=
opts
[
:reblog_muted_users_ap_ids
]
||
User
.
reblog_muted_users_ap_ids
(
user
)
from
(
activity
in
query
,
where
:
fragment
(
"not ( ?->>'type' = 'Announce' and ? = ANY(?))"
,
activity
.
data
,
activity
.
actor
,
^
muted_reblogs
)
)
end
defp
restrict_muted_reblogs
(
query
,
_
),
do
:
query
defp
restrict_instance
(
query
,
%{
instance
:
instance
})
when
is_binary
(
instance
)
do
from
(
activity
in
query
,
where
:
fragment
(
"split_part(actor::text, '/'::text, 3) = ?"
,
^
instance
)
)
end
defp
restrict_instance
(
query
,
_
),
do
:
query
defp
restrict_filtered
(
query
,
%{
user
:
%
User
{}
=
user
})
do
case
Filter
.
compose_regex
(
user
)
do
nil
->
query
regex
->
from
([
activity
,
object
]
in
query
,
where
:
fragment
(
"not(?->>'content' ~* ?)"
,
object
.
data
,
^
regex
)
or
activity
.
actor
==
^
user
.
ap_id
)
end
end
defp
restrict_filtered
(
query
,
%{
blocking_user
:
%
User
{}
=
user
})
do
restrict_filtered
(
query
,
%{
user
:
user
})
end
defp
restrict_filtered
(
query
,
_
),
do
:
query
defp
restrict_unauthenticated
(
query
,
nil
)
do
local
=
Config
.
restrict_unauthenticated_access?
(
:activities
,
:local
)
remote
=
Config
.
restrict_unauthenticated_access?
(
:activities
,
:remote
)
cond
do
local
and
remote
->
from
(
activity
in
query
,
where
:
false
)
local
->
from
(
activity
in
query
,
where
:
activity
.
local
==
false
)
remote
->
from
(
activity
in
query
,
where
:
activity
.
local
==
true
)
true
->
query
end
end
defp
restrict_unauthenticated
(
query
,
_
),
do
:
query
defp
restrict_quote_url
(
query
,
%{
quote_url
:
quote_url
})
do
from
([
_activity
,
object
]
in
query
,
where
:
fragment
(
"(?)->'quoteUrl' = ?"
,
object
.
data
,
^
quote_url
)
)
end
defp
restrict_quote_url
(
query
,
_
),
do
:
query
defp
exclude_poll_votes
(
query
,
%{
include_poll_votes
:
true
}),
do
:
query
defp
exclude_poll_votes
(
query
,
_
)
do
if
has_named_binding?
(
query
,
:object
)
do
from
([
activity
,
object
:
o
]
in
query
,
where
:
fragment
(
"not(?->>'type' = ?)"
,
o
.
data
,
"Answer"
)
)
else
query
end
end
defp
exclude_chat_messages
(
query
,
%{
include_chat_messages
:
true
}),
do
:
query
defp
exclude_chat_messages
(
query
,
_
)
do
if
has_named_binding?
(
query
,
:object
)
do
from
([
activity
,
object
:
o
]
in
query
,
where
:
fragment
(
"not(?->>'type' = ?)"
,
o
.
data
,
"ChatMessage"
)
)
else
query
end
end
defp
exclude_invisible_actors
(
query
,
%{
type
:
"Flag"
}),
do
:
query
defp
exclude_invisible_actors
(
query
,
%{
invisible_actors
:
true
}),
do
:
query
defp
exclude_invisible_actors
(
query
,
_opts
)
do
query
|>
join
(
:inner
,
[
activity
],
u
in
User
,
as
:
:u
,
on
:
activity
.
actor
==
u
.
ap_id
and
u
.
invisible
==
false
)
end
defp
exclude_id
(
query
,
%{
exclude_id
:
id
})
when
is_binary
(
id
)
do
from
(
activity
in
query
,
where
:
activity
.
id
!=
^
id
)
end
defp
exclude_id
(
query
,
_
),
do
:
query
defp
maybe_preload_objects
(
query
,
%{
skip_preload
:
true
}),
do
:
query
defp
maybe_preload_objects
(
query
,
_
)
do
query
|>
Activity
.
with_preloaded_object
()
end
defp
maybe_preload_bookmarks
(
query
,
%{
skip_preload
:
true
}),
do
:
query
defp
maybe_preload_bookmarks
(
query
,
opts
)
do
query
|>
Activity
.
with_preloaded_bookmark
(
opts
[
:user
])
end
defp
maybe_preload_report_notes
(
query
,
%{
preload_report_notes
:
true
})
do
query
|>
Activity
.
with_preloaded_report_notes
()
end
defp
maybe_preload_report_notes
(
query
,
_
),
do
:
query
defp
maybe_set_thread_muted_field
(
query
,
%{
skip_preload
:
true
}),
do
:
query
defp
maybe_set_thread_muted_field
(
query
,
opts
)
do
query
|>
Activity
.
with_set_thread_muted_field
(
opts
[
:muting_user
]
||
opts
[
:user
])
end
defp
maybe_order
(
query
,
%{
order
:
:desc
})
do
query
|>
order_by
(
desc
:
:id
)
end
defp
maybe_order
(
query
,
%{
order
:
:asc
})
do
query
|>
order_by
(
asc
:
:id
)
end
defp
maybe_order
(
query
,
_
),
do
:
query
defp
normalize_fetch_activities_query_opts
(
opts
)
do
Enum
.
reduce
([
:tag
,
:tag_all
,
:tag_reject
],
opts
,
fn
key
,
opts
->
case
opts
[
key
]
do
value
when
is_bitstring
(
value
)
->
Map
.
put
(
opts
,
key
,
Hashtag
.
normalize_name
(
value
))
value
when
is_list
(
value
)
->
normalized_value
=
value
|>
Enum
.
map
(
&
Hashtag
.
normalize_name
/
1
)
|>
Enum
.
uniq
()
Map
.
put
(
opts
,
key
,
normalized_value
)
_
->
opts
end
end
)
end
defp
fetch_activities_query_ap_ids_ops
(
opts
)
do
source_user
=
opts
[
:muting_user
]
ap_id_relationships
=
if
source_user
,
do
:
[
:mute
,
:reblog_mute
],
else
:
[]
ap_id_relationships
=
if
opts
[
:blocking_user
]
&&
opts
[
:blocking_user
]
==
source_user
do
[
:block
|
ap_id_relationships
]
else
ap_id_relationships
end
preloaded_ap_ids
=
User
.
outgoing_relationships_ap_ids
(
source_user
,
ap_id_relationships
)
restrict_blocked_opts
=
Map
.
merge
(%{
blocked_users_ap_ids
:
preloaded_ap_ids
[
:block
]},
opts
)
restrict_muted_opts
=
Map
.
merge
(%{
muted_users_ap_ids
:
preloaded_ap_ids
[
:mute
]},
opts
)
restrict_muted_reblogs_opts
=
Map
.
merge
(%{
reblog_muted_users_ap_ids
:
preloaded_ap_ids
[
:reblog_mute
]},
opts
)
{
restrict_blocked_opts
,
restrict_muted_opts
,
restrict_muted_reblogs_opts
}
end
def
fetch_activities_query
(
recipients
,
opts
\\
%{})
do
opts
=
normalize_fetch_activities_query_opts
(
opts
)
{
restrict_blocked_opts
,
restrict_muted_opts
,
restrict_muted_reblogs_opts
}
=
fetch_activities_query_ap_ids_ops
(
opts
)
config
=
%{
skip_thread_containment
:
Config
.
get
([
:instance
,
:skip_thread_containment
])
}
query
=
Activity
|>
maybe_preload_objects
(
opts
)
|>
maybe_preload_bookmarks
(
opts
)
|>
maybe_preload_report_notes
(
opts
)
|>
maybe_set_thread_muted_field
(
opts
)
|>
maybe_order
(
opts
)
|>
restrict_recipients
(
recipients
,
opts
[
:user
])
|>
restrict_replies
(
opts
)
|>
restrict_since
(
opts
)
|>
restrict_local
(
opts
)
|>
restrict_remote
(
opts
)
|>
restrict_actor
(
opts
)
|>
restrict_type
(
opts
)
|>
restrict_state
(
opts
)
|>
restrict_favorited_by
(
opts
)
|>
restrict_blocked
(
restrict_blocked_opts
)
|>
restrict_blockers_visibility
(
opts
)
|>
restrict_muted
(
restrict_muted_opts
)
|>
restrict_filtered
(
opts
)
|>
restrict_media
(
opts
)
|>
restrict_visibility
(
opts
)
|>
restrict_thread_visibility
(
opts
,
config
)
|>
restrict_reblogs
(
opts
)
|>
restrict_pinned
(
opts
)
|>
restrict_muted_reblogs
(
restrict_muted_reblogs_opts
)
|>
restrict_instance
(
opts
)
|>
restrict_announce_object_actor
(
opts
)
|>
restrict_filtered
(
opts
)
|>
restrict_quote_url
(
opts
)
|>
maybe_restrict_deactivated_users
(
opts
)
|>
exclude_poll_votes
(
opts
)
|>
exclude_chat_messages
(
opts
)
|>
exclude_invisible_actors
(
opts
)
|>
exclude_visibility
(
opts
)
if
Config
.
feature_enabled?
(
:improved_hashtag_timeline
)
do
query
|>
restrict_hashtag_any
(
opts
)
|>
restrict_hashtag_all
(
opts
)
|>
restrict_hashtag_reject_any
(
opts
)
else
query
|>
restrict_embedded_tag_any
(
opts
)
|>
restrict_embedded_tag_all
(
opts
)
|>
restrict_embedded_tag_reject_any
(
opts
)
end
end
@doc
"""
Fetch favorites activities of user with order by sort adds to favorites
"""
@spec
fetch_favourites
(
User
.
t
(),
map
(),
Pagination
.
type
())
::
list
(
Activity
.
t
())
def
fetch_favourites
(
user
,
params
\\
%{},
pagination
\\
:keyset
)
do
user
.
ap_id
|>
Activity.Queries
.
by_actor
()
|>
Activity.Queries
.
by_type
(
"Like"
)
|>
Activity
.
with_joined_object
()
|>
Object
.
with_joined_activity
()
|>
select
([
like
,
object
,
activity
],
%{
activity
|
object
:
object
,
pagination_id
:
like
.
id
})
|>
order_by
([
like
,
_
,
_
],
desc_nulls_last
:
like
.
id
)
|>
Pagination
.
fetch_paginated
(
Map
.
merge
(
params
,
%{
skip_order
:
true
}),
pagination
)
end
defp
maybe_update_cc
(
activities
,
[
_
|
_
]
=
list_memberships
,
%
User
{
ap_id
:
user_ap_id
})
do
Enum
.
map
(
activities
,
fn
%{
data
:
%{
"bcc"
=>
[
_
|
_
]
=
bcc
}}
=
activity
->
if
Enum
.
any?
(
bcc
,
&
(
&1
in
list_memberships
))
do
update_in
(
activity
.
data
[
"cc"
],
&
[
user_ap_id
|
&1
])
else
activity
end
activity
->
activity
end
)
end
defp
maybe_update_cc
(
activities
,
_
,
_
),
do
:
activities
defp
fetch_activities_bounded_query
(
query
,
recipients
,
recipients_with_public
)
do
from
(
activity
in
query
,
where
:
fragment
(
"? && ?"
,
activity
.
recipients
,
^
recipients
)
or
(
fragment
(
"? && ?"
,
activity
.
recipients
,
^
recipients_with_public
)
and
^
Constants
.
as_public
()
in
activity
.
recipients
)
)
end
def
fetch_activities_bounded
(
recipients
,
recipients_with_public
,
opts
\\
%{},
pagination
\\
:keyset
)
do
fetch_activities_query
([],
opts
)
|>
fetch_activities_bounded_query
(
recipients
,
recipients_with_public
)
|>
Pagination
.
fetch_paginated
(
opts
,
pagination
)
|>
Enum
.
reverse
()
end
@spec
upload
(
Upload
.
source
(),
keyword
())
::
{
:ok
,
Object
.
t
()}
|
{
:error
,
any
()}
def
upload
(
file
,
opts
\\
[])
do
with
{
:ok
,
data
}
<-
Upload
.
store
(
sanitize_upload_file
(
file
),
opts
)
do
obj_data
=
Maps
.
put_if_present
(
data
,
"actor"
,
opts
[
:actor
])
Repo
.
insert
(%
Object
{
data
:
obj_data
})
end
end
defp
sanitize_upload_file
(%
Plug.Upload
{
filename
:
filename
}
=
upload
)
when
is_binary
(
filename
)
do
%
Plug.Upload
{
upload
|
filename
:
Path
.
basename
(
filename
)
}
end
defp
sanitize_upload_file
(
upload
),
do
:
upload
@spec
get_actor_url
(
any
())
::
binary
()
|
nil
defp
get_actor_url
(
url
)
when
is_binary
(
url
),
do
:
url
defp
get_actor_url
(%{
"href"
=>
href
})
when
is_binary
(
href
),
do
:
href
defp
get_actor_url
(
url
)
when
is_list
(
url
)
do
url
|>
List
.
first
()
|>
get_actor_url
()
end
defp
get_actor_url
(
_url
),
do
:
nil
defp
normalize_image
(%{
"url"
=>
url
})
do
%{
"type"
=>
"Image"
,
"url"
=>
[%{
"href"
=>
url
}]
}
end
defp
normalize_image
(
urls
)
when
is_list
(
urls
),
do
:
urls
|>
List
.
first
()
|>
normalize_image
()
defp
normalize_image
(
_
),
do
:
nil
defp
object_to_user_data
(
data
,
additional
)
do
fields
=
data
|>
Map
.
get
(
"attachment"
,
[])
|>
Enum
.
filter
(
fn
%{
"type"
=>
t
}
->
t
==
"PropertyValue"
end
)
|>
Enum
.
map
(
fn
fields
->
Map
.
take
(
fields
,
[
"name"
,
"value"
])
end
)
emojis
=
data
|>
Map
.
get
(
"tag"
,
[])
|>
Enum
.
filter
(
fn
%{
"type"
=>
"Emoji"
}
->
true
_
->
false
end
)
|>
Map
.
new
(
fn
%{
"icon"
=>
%{
"url"
=>
url
},
"name"
=>
name
}
->
{
String
.
trim
(
name
,
":"
),
url
}
end
)
is_locked
=
data
[
"manuallyApprovesFollowers"
]
||
false
capabilities
=
data
[
"capabilities"
]
||
%{}
accepts_chat_messages
=
capabilities
[
"acceptsChatMessages"
]
data
=
Transmogrifier
.
maybe_fix_user_object
(
data
)
is_discoverable
=
data
[
"discoverable"
]
||
false
invisible
=
data
[
"invisible"
]
||
false
actor_type
=
data
[
"type"
]
||
"Person"
featured_address
=
data
[
"featured"
]
{
:ok
,
pinned_objects
}
=
fetch_and_prepare_featured_from_ap_id
(
featured_address
)
public_key
=
if
is_map
(
data
[
"publicKey"
])
&&
is_binary
(
data
[
"publicKey"
][
"publicKeyPem"
])
do
data
[
"publicKey"
][
"publicKeyPem"
]
end
shared_inbox
=
if
is_map
(
data
[
"endpoints"
])
&&
is_binary
(
data
[
"endpoints"
][
"sharedInbox"
])
do
data
[
"endpoints"
][
"sharedInbox"
]
end
birthday
=
if
is_binary
(
data
[
"vcard:bday"
])
do
case
Date
.
from_iso8601
(
data
[
"vcard:bday"
])
do
{
:ok
,
date
}
->
date
{
:error
,
_
}
->
nil
end
end
show_birthday
=
!!
birthday
# if WebFinger request was already done, we probably have acct, otherwise
# we request WebFinger here
nickname
=
additional
[
:nickname_from_acct
]
||
generate_nickname
(
data
)
%{
ap_id
:
data
[
"id"
],
uri
:
get_actor_url
(
data
[
"url"
]),
banner
:
normalize_image
(
data
[
"image"
]),
fields
:
fields
,
emoji
:
emojis
,
is_locked
:
is_locked
,
is_discoverable
:
is_discoverable
,
invisible
:
invisible
,
avatar
:
normalize_image
(
data
[
"icon"
]),
name
:
data
[
"name"
],
follower_address
:
data
[
"followers"
],
following_address
:
data
[
"following"
],
featured_address
:
featured_address
,
bio
:
data
[
"summary"
]
||
""
,
actor_type
:
actor_type
,
also_known_as
:
Map
.
get
(
data
,
"alsoKnownAs"
,
[]),
public_key
:
public_key
,
inbox
:
data
[
"inbox"
],
shared_inbox
:
shared_inbox
,
accepts_chat_messages
:
accepts_chat_messages
,
birthday
:
birthday
,
show_birthday
:
show_birthday
,
pinned_objects
:
pinned_objects
,
nickname
:
nickname
}
end
defp
generate_nickname
(%{
"preferredUsername"
=>
username
}
=
data
)
when
is_binary
(
username
)
do
generated
=
"
#{
username
}
@
#{
URI
.
parse
(
data
[
"id"
])
.
host
}
"
if
Config
.
get
([
WebFinger
,
:update_nickname_on_user_fetch
])
do
case
WebFinger
.
finger
(
generated
)
do
{
:ok
,
%{
"subject"
=>
"acct:"
<>
acct
}}
->
acct
_
->
generated
end
else
generated
end
end
# nickname can be nil because of virtual actors
defp
generate_nickname
(
_
),
do
:
nil
def
fetch_follow_information_for_user
(
user
)
do
with
{
:ok
,
following_data
}
<-
Fetcher
.
fetch_and_contain_remote_object_from_id
(
user
.
following_address
),
{
:ok
,
hide_follows
}
<-
collection_private
(
following_data
),
{
:ok
,
followers_data
}
<-
Fetcher
.
fetch_and_contain_remote_object_from_id
(
user
.
follower_address
),
{
:ok
,
hide_followers
}
<-
collection_private
(
followers_data
)
do
{
:ok
,
%{
hide_follows
:
hide_follows
,
follower_count
:
normalize_counter
(
followers_data
[
"totalItems"
]),
following_count
:
normalize_counter
(
following_data
[
"totalItems"
]),
hide_followers
:
hide_followers
}}
else
{
:error
,
_
}
=
e
->
e
e
->
{
:error
,
e
}
end
end
defp
normalize_counter
(
counter
)
when
is_integer
(
counter
),
do
:
counter
defp
normalize_counter
(
_
),
do
:
0
def
maybe_update_follow_information
(
user_data
)
do
with
{
:enabled
,
true
}
<-
{
:enabled
,
Config
.
get
([
:instance
,
:external_user_synchronization
])},
{
_
,
true
}
<-
{
:user_type_check
,
user_data
[
:type
]
in
[
"Person"
,
"Service"
]},
{
_
,
true
}
<-
{
:collections_available
,
!!
(
user_data
[
:following_address
]
&&
user_data
[
:follower_address
])},
{
:ok
,
info
}
<-
fetch_follow_information_for_user
(
user_data
)
do
info
=
Map
.
merge
(
user_data
[
:info
]
||
%{},
info
)
user_data
|>
Map
.
put
(
:info
,
info
)
else
{
:user_type_check
,
false
}
->
user_data
{
:collections_available
,
false
}
->
user_data
{
:enabled
,
false
}
->
user_data
e
->
Logger
.
error
(
"Follower/Following counter update for
#{
user_data
.
ap_id
}
failed.
\n
"
<>
inspect
(
e
)
)
user_data
end
end
defp
collection_private
(%{
"first"
=>
%{
"type"
=>
type
}})
when
type
in
[
"CollectionPage"
,
"OrderedCollectionPage"
],
do
:
{
:ok
,
false
}
defp
collection_private
(%{
"first"
=>
first
})
do
with
{
:ok
,
%{
"type"
=>
type
}}
when
type
in
[
"CollectionPage"
,
"OrderedCollectionPage"
]
<-
Fetcher
.
fetch_and_contain_remote_object_from_id
(
first
)
do
{
:ok
,
false
}
else
{
:error
,
_
}
->
{
:ok
,
true
}
end
end
defp
collection_private
(
_data
),
do
:
{
:ok
,
true
}
def
user_data_from_user_object
(
data
,
additional
\\
[])
do
with
{
:ok
,
data
}
<-
MRF
.
filter
(
data
)
do
{
:ok
,
object_to_user_data
(
data
,
additional
)}
else
e
->
{
:error
,
e
}
end
end
defp
fetch_and_prepare_user_from_ap_id
(
ap_id
,
additional
)
do
with
{
:ok
,
data
}
<-
Fetcher
.
fetch_and_contain_remote_object_from_id
(
ap_id
),
{
:ok
,
data
}
<-
user_data_from_user_object
(
data
,
additional
)
do
{
:ok
,
maybe_update_follow_information
(
data
)}
else
# If this has been deleted, only log a debug and not an error
{
:error
,
"Object has been deleted"
=
e
}
->
Logger
.
debug
(
"Could not decode user at fetch
#{
ap_id
}
,
#{
inspect
(
e
)
}
"
)
{
:error
,
e
}
{
:error
,
{
:reject
,
reason
}
=
e
}
->
Logger
.
info
(
"Rejected user
#{
ap_id
}
:
#{
inspect
(
reason
)
}
"
)
{
:error
,
e
}
{
:error
,
e
}
->
Logger
.
error
(
"Could not decode user at fetch
#{
ap_id
}
,
#{
inspect
(
e
)
}
"
)
{
:error
,
e
}
end
end
def
maybe_handle_clashing_nickname
(
data
)
do
with
nickname
when
is_binary
(
nickname
)
<-
data
[
:nickname
],
%
User
{}
=
old_user
<-
User
.
get_by_nickname
(
nickname
),
{
_
,
false
}
<-
{
:ap_id_comparison
,
data
[
:ap_id
]
==
old_user
.
ap_id
}
do
Logger
.
info
(
"Found an old user for
#{
nickname
}
, the old ap id is
#{
old_user
.
ap_id
}
, new one is
#{
data
[
:ap_id
]
}
, renaming."
)
old_user
|>
User
.
remote_user_changeset
(%{
nickname
:
"
#{
old_user
.
id
}
.
#{
old_user
.
nickname
}
"
})
|>
User
.
update_and_set_cache
()
else
{
:ap_id_comparison
,
true
}
->
Logger
.
info
(
"Found an old user for
#{
data
[
:nickname
]
}
, but the ap id
#{
data
[
:ap_id
]
}
is the same as the new user. Race condition? Not changing anything."
)
_
->
nil
end
end
def
pin_data_from_featured_collection
(%{
"type"
=>
type
,
"orderedItems"
=>
objects
})
when
type
in
[
"OrderedCollection"
,
"Collection"
]
do
Map
.
new
(
objects
,
fn
%{
"id"
=>
object_ap_id
}
->
{
object_ap_id
,
NaiveDateTime
.
utc_now
()}
object_ap_id
when
is_binary
(
object_ap_id
)
->
{
object_ap_id
,
NaiveDateTime
.
utc_now
()}
end
)
end
def
pin_data_from_featured_collection
(
obj
)
do
Logger
.
error
(
"Could not parse featured collection
#{
inspect
(
obj
)
}
"
)
%{}
end
def
fetch_and_prepare_featured_from_ap_id
(
nil
)
do
{
:ok
,
%{}}
end
def
fetch_and_prepare_featured_from_ap_id
(
ap_id
)
do
with
{
:ok
,
data
}
<-
Fetcher
.
fetch_and_contain_remote_object_from_id
(
ap_id
)
do
{
:ok
,
pin_data_from_featured_collection
(
data
)}
else
e
->
Logger
.
error
(
"Could not decode featured collection at fetch
#{
ap_id
}
,
#{
inspect
(
e
)
}
"
)
{
:ok
,
%{}}
end
end
def
pinned_fetch_task
(
nil
),
do
:
nil
def
pinned_fetch_task
(%{
pinned_objects
:
pins
})
do
if
Enum
.
all?
(
pins
,
fn
{
ap_id
,
_
}
->
Object
.
get_cached_by_ap_id
(
ap_id
)
||
match?
({
:ok
,
_object
},
Fetcher
.
fetch_object_from_id
(
ap_id
))
end
)
do
:ok
else
:error
end
end
def
make_user_from_ap_id
(
ap_id
,
additional
\\
[])
do
user
=
User
.
get_cached_by_ap_id
(
ap_id
)
with
{
:ok
,
data
}
<-
fetch_and_prepare_user_from_ap_id
(
ap_id
,
additional
)
do
{
:ok
,
_pid
}
=
Task
.
start
(
fn
->
pinned_fetch_task
(
data
)
end
)
if
user
do
user
|>
User
.
remote_user_changeset
(
data
)
|>
User
.
update_and_set_cache
()
else
maybe_handle_clashing_nickname
(
data
)
data
|>
User
.
remote_user_changeset
()
|>
Repo
.
insert
()
|>
User
.
set_cache
()
end
end
end
def
make_user_from_nickname
(
nickname
)
do
with
{
:ok
,
%{
"ap_id"
=>
ap_id
,
"subject"
=>
"acct:"
<>
acct
}}
when
not
is_nil
(
ap_id
)
<-
WebFinger
.
finger
(
nickname
)
do
make_user_from_ap_id
(
ap_id
,
nickname_from_acct
:
acct
)
else
_e
->
{
:error
,
"No AP id in WebFinger"
}
end
end
# filter out broken threads
defp
contain_broken_threads
(%
Activity
{}
=
activity
,
%
User
{}
=
user
)
do
entire_thread_visible_for_user?
(
activity
,
user
)
end
# do post-processing on a specific activity
def
contain_activity
(%
Activity
{}
=
activity
,
%
User
{}
=
user
)
do
contain_broken_threads
(
activity
,
user
)
end
def
fetch_direct_messages_query
do
Activity
|>
restrict_type
(%{
type
:
"Create"
})
|>
restrict_visibility
(%{
visibility
:
"direct"
})
|>
order_by
([
activity
],
asc
:
activity
.
id
)
end
defp
maybe_restrict_deactivated_users
(
activity
,
%{
type
:
"Flag"
}),
do
:
activity
defp
maybe_restrict_deactivated_users
(
activity
,
_opts
),
do
:
Activity
.
restrict_deactivated_users
(
activity
)
end
File Metadata
Details
Attached
Mime Type
text/x-ruby
Expires
Mon, Jun 29, 9:21 AM (1 d, 10 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1632465
Default Alt Text
activity_pub.ex (54 KB)
Attached To
Mode
rPUBE pleroma-upstream
Attached
Detach File
Event Timeline
Log In to Comment