Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F56225009
context_objects_deletion_migrator.ex
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Size
4 KB
Referenced Files
None
Subscribers
None
context_objects_deletion_migrator.ex
View Options
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule
Pleroma.Migrators.ContextObjectsDeletionMigrator
do
defmodule
State
do
use
Pleroma.Migrators.Support.BaseMigratorState
@impl
Pleroma.Migrators.Support.BaseMigratorState
defdelegate
data_migration
(),
to
:
Pleroma.DataMigration
,
as
:
:delete_context_objects
end
use
Pleroma.Migrators.Support.BaseMigrator
alias
Pleroma.Migrators.Support.BaseMigrator
alias
Pleroma.Object
@doc
"This migration removes objects created exclusively for contexts, containing only an `id` field."
@impl
BaseMigrator
def
feature_config_path
,
do
:
[
:features
,
:delete_context_objects
]
@impl
BaseMigrator
def
fault_rate_allowance
,
do
:
Config
.
get
([
:delete_context_objects
,
:fault_rate_allowance
],
0
)
@impl
BaseMigrator
def
perform
do
data_migration_id
=
data_migration_id
()
max_processed_id
=
get_stat
(
:max_processed_id
,
0
)
Logger
.
info
(
"Deleting context objects from `objects` (from oid:
#{
max_processed_id
}
)..."
)
query
()
|>
where
([
object
],
object
.
id
>
^
max_processed_id
)
|>
Repo
.
chunk_stream
(
100
,
:batches
,
timeout
:
:infinity
)
|>
Stream
.
each
(
fn
objects
->
object_ids
=
Enum
.
map
(
objects
,
&
&1
.
id
)
results
=
Enum
.
map
(
object_ids
,
&
delete_context_object
(
&1
))
failed_ids
=
results
|>
Enum
.
filter
(
&
(
elem
(
&1
,
0
)
==
:error
))
|>
Enum
.
map
(
&
elem
(
&1
,
1
))
chunk_affected_count
=
results
|>
Enum
.
filter
(
&
(
elem
(
&1
,
0
)
==
:ok
))
|>
length
()
for
failed_id
<-
failed_ids
do
_
=
Repo
.
query
(
"INSERT INTO data_migration_failed_ids(data_migration_id, record_id) "
<>
"VALUES ($1, $2) ON CONFLICT DO NOTHING;"
,
[
data_migration_id
,
failed_id
]
)
end
_
=
Repo
.
query
(
"DELETE FROM data_migration_failed_ids "
<>
"WHERE data_migration_id = $1 AND record_id = ANY($2)"
,
[
data_migration_id
,
object_ids
--
failed_ids
]
)
max_object_id
=
Enum
.
at
(
object_ids
,
-
1
)
put_stat
(
:max_processed_id
,
max_object_id
)
increment_stat
(
:iteration_processed_count
,
length
(
object_ids
))
increment_stat
(
:processed_count
,
length
(
object_ids
))
increment_stat
(
:failed_count
,
length
(
failed_ids
))
increment_stat
(
:affected_count
,
chunk_affected_count
)
put_stat
(
:records_per_second
,
records_per_second
())
persist_state
()
# A quick and dirty approach to controlling the load this background migration imposes
sleep_interval
=
Config
.
get
([
:delete_context_objects
,
:sleep_interval_ms
],
0
)
Process
.
sleep
(
sleep_interval
)
end
)
|>
Stream
.
run
()
end
@impl
BaseMigrator
def
query
do
# Context objects have no activity type, and only one field, `id`.
# Only those context objects are without types.
from
(
object
in
Object
,
where
:
fragment
(
"(?)->'type' IS NULL"
,
object
.
data
),
select
:
%{
id
:
object
.
id
}
)
end
@spec
delete_context_object
(
integer
())
::
{
:ok
|
:error
,
integer
()}
defp
delete_context_object
(
id
)
do
result
=
%
Object
{
id
:
id
}
|>
Repo
.
delete
()
|>
elem
(
0
)
{
result
,
id
}
end
@impl
BaseMigrator
def
retry_failed
do
data_migration_id
=
data_migration_id
()
failed_objects_query
()
|>
Repo
.
chunk_stream
(
100
,
:one
)
|>
Stream
.
each
(
fn
object
->
with
{
res
,
_
}
when
res
!=
:error
<-
delete_context_object
(
object
.
id
)
do
_
=
Repo
.
query
(
"DELETE FROM data_migration_failed_ids "
<>
"WHERE data_migration_id = $1 AND record_id = $2"
,
[
data_migration_id
,
object
.
id
]
)
end
end
)
|>
Stream
.
run
()
put_stat
(
:failed_count
,
failures_count
())
persist_state
()
force_continue
()
end
defp
failed_objects_query
do
from
(
o
in
Object
)
|>
join
(
:inner
,
[
o
],
dmf
in
fragment
(
"SELECT * FROM data_migration_failed_ids"
),
on
:
dmf
.
record_id
==
o
.
id
)
|>
where
([
_o
,
dmf
],
dmf
.
data_migration_id
==
^
data_migration_id
())
|>
order_by
([
o
],
asc
:
o
.
id
)
end
end
File Metadata
Details
Attached
Mime Type
text/x-ruby
Expires
Fri, Apr 3, 9:44 PM (1 h, 24 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1150072
Default Alt Text
context_objects_deletion_migrator.ex (4 KB)
Attached To
Mode
rPUBE pleroma-upstream
Attached
Detach File
Event Timeline
Log In to Comment