Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F8251924
D237.1760690994.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Size
17 KB
Referenced Files
None
Subscribers
None
D237.1760690994.diff
View Options
diff --git a/src/client/actions/storage.hpp b/src/client/actions/storage.hpp
--- a/src/client/actions/storage.hpp
+++ b/src/client/actions/storage.hpp
@@ -11,4 +11,5 @@
namespace Kazv
{
[[nodiscard]] ClientResult updateClient(ClientModel m, LoadEventsFromStorageAction a);
+ [[nodiscard]] ClientResult updateClient(ClientModel m, PurgeRoomTimelineAction a);
}
diff --git a/src/client/actions/storage.cpp b/src/client/actions/storage.cpp
--- a/src/client/actions/storage.cpp
+++ b/src/client/actions/storage.cpp
@@ -41,4 +41,15 @@
return { m, lager::noop };
}
+
+ ClientResult updateClient(ClientModel m, PurgeRoomTimelineAction a)
+ {
+ for (const auto &[roomId, maxToKeep] : a.roomIdToMaxToKeepMap) {
+ m.roomList = RoomListModel::update(
+ std::move(m.roomList),
+ UpdateRoomAction{roomId, PurgeEventsAction{maxToKeep}}
+ );
+ }
+ return { m, lager::noop };
+ }
}
diff --git a/src/client/client-model.hpp b/src/client/client-model.hpp
--- a/src/client/client-model.hpp
+++ b/src/client/client-model.hpp
@@ -594,6 +594,14 @@
immer::map<std::string, EventList> relatedEvents;
};
+ /// Remove events from the model, keeping only the latest `maxToKeep` events.
+ /// For each room, this takes O(maxToKeep * log(maxToKeep)) time.
+ struct PurgeRoomTimelineAction
+ {
+ /// A map from roomId to maxToKeep
+ immer::map<std::string, std::size_t> roomIdToMaxToKeepMap;
+ };
+
template<class Archive>
void serialize(Archive &ar, ClientModel &m, std::uint32_t const version)
{
diff --git a/src/client/client.hpp b/src/client/client.hpp
--- a/src/client/client.hpp
+++ b/src/client/client.hpp
@@ -603,6 +603,18 @@
*/
BaseJob getRoomIdByAliasJob(std::string roomAlias) const;
+ /**
+ * Purge events in room, keeping the latest `numToKeep` events.
+ *
+ * The events are removed from the lager store. The timeline will
+ * contain at most `numToKeep` events, but the `messages` property
+ * may contain more in order to maintain the room invariants.
+ * @sa RoomModel
+ *
+ * @param roomIdToMaxToKeepMap A map from "room id" to "max number of timeline events to keep."
+ */
+ PromiseT purgeRoomEvents(immer::map<std::string, std::size_t> roomIdToMaxToKeepMap) const;
+
private:
void syncForever(std::optional<int> retryTime = std::nullopt) const;
diff --git a/src/client/client.cpp b/src/client/client.cpp
--- a/src/client/client.cpp
+++ b/src/client/client.cpp
@@ -500,4 +500,9 @@
{
return Kazv::getRoomIdByAliasJob(clientCursor().get(), roomAlias);
}
+
+ auto Client::purgeRoomEvents(immer::map<std::string, std::size_t> roomIdToMaxToKeepMap) const -> PromiseT
+ {
+ return m_ctx.dispatch(PurgeRoomTimelineAction{roomIdToMaxToKeepMap});
+ }
}
diff --git a/src/client/clientfwd.hpp b/src/client/clientfwd.hpp
--- a/src/client/clientfwd.hpp
+++ b/src/client/clientfwd.hpp
@@ -79,6 +79,7 @@
struct ResubmitJobAction;
struct LoadEventsFromStorageAction;
+ struct PurgeRoomTimelineAction;
struct ClientModel;
@@ -145,7 +146,8 @@
ResubmitJobAction,
- LoadEventsFromStorageAction
+ LoadEventsFromStorageAction,
+ PurgeRoomTimelineAction
>;
using ClientEffect = Effect<ClientAction, lager::deps<>>;
diff --git a/src/client/room/room-model.hpp b/src/client/room/room-model.hpp
--- a/src/client/room/room-model.hpp
+++ b/src/client/room/room-model.hpp
@@ -180,6 +180,12 @@
std::string myUserId;
};
+ /// Remove events from the model, retaining only the latest `maxToKeep` events.
+ struct PurgeEventsAction
+ {
+ std::size_t maxToKeep;
+ };
+
inline bool operator==(const PendingRoomKeyEvent &a, const PendingRoomKeyEvent &b)
{
return a.txnId == b.txnId && a.messages == b.messages;
@@ -220,6 +226,16 @@
*/
auto sortKeyForTimelineEvent(Event e) -> std::tuple<Timestamp, std::string>;
+ /**
+ * The model to store information about a room.
+ *
+ * Room invariants:
+ * Any event in timeline is in messages.
+ * Any event in undecryptedEvents is in messages.
+ * Any event in unreadNotificationEventIds is in messages.
+ * Any relater (i.e. child) event in reverseEventRelationships is in messages.
+ * localReadMarker (if not empty) is in messages.
+ */
struct RoomModel
{
using Membership = RoomMembership;
@@ -335,6 +351,12 @@
void recalculateUndecryptedEvents();
+ /**
+ * Check if the invariants in the model are satisfied.
+ * @return true iff the invariants are satisfied.
+ */
+ bool checkInvariants() const;
+
using Action = std::variant<
AddStateEventsAction,
MaybeAddStateEventsAction,
@@ -356,7 +378,8 @@
UpdateInvitedMemberCountAction,
AddLocalNotificationsAction,
RemoveReadLocalNotificationsAction,
- UpdateLocalReadMarkerAction
+ UpdateLocalReadMarkerAction,
+ PurgeEventsAction
>;
static RoomModel update(RoomModel r, Action a);
diff --git a/src/client/room/room-model.cpp b/src/client/room/room-model.cpp
--- a/src/client/room/room-model.cpp
+++ b/src/client/room/room-model.cpp
@@ -417,6 +417,44 @@
r.localReadMarker = a.localReadMarker;
auto next = RoomModel::update(std::move(r), RemoveReadLocalNotificationsAction{a.myUserId});
return next;
+ },
+ [&](PurgeEventsAction a) {
+ if (r.timeline.size() <= a.maxToKeep) {
+ return r;
+ }
+ auto numToDrop = r.timeline.size() - a.maxToKeep;
+ auto keepEvents = intoImmer(
+ EventList{},
+ zug::map([&r](const auto &eventId) {
+ return r.messages[eventId];
+ }),
+ std::move(r.timeline).drop(numToDrop)
+ );
+ auto origMessages = r.messages;
+ r.reverseEventRelationships = {};
+ r.timeline = {};
+ r.messages = {};
+ r.undecryptedEvents = {};
+
+ auto msgs = intoImmer(
+ r.localReadMarker.empty() ? EventList{} : EventList{origMessages[r.localReadMarker]},
+ zug::filter([l=r.localReadMarker](const auto &eventId) {
+ return eventId != l;
+ })
+ | zug::map([&origMessages](const auto &eventId) {
+ return origMessages[eventId];
+ }),
+ r.unreadNotificationEventIds
+ );
+
+ auto next = update(std::move(r), AddMessagesAction{msgs});
+
+ return update(std::move(next), AddToTimelineAction{
+ keepEvents,
+ std::nullopt,
+ std::nullopt,
+ std::nullopt,
+ });
}
);
}
@@ -709,4 +747,24 @@
return kv.second;
}), messages));
}
+
+ bool RoomModel::checkInvariants() const
+ {
+ auto inMessages = [this](const std::string &eventId) {
+ return !!messages.count(eventId);
+ };
+ return immer::all_of(timeline, inMessages)
+ && immer::all_of(unreadNotificationEventIds, inMessages)
+ && std::all_of(undecryptedEvents.begin(), undecryptedEvents.end(), [&inMessages](const auto &p) {
+ return immer::all_of(p.second, inMessages);
+ })
+ && (localReadMarker.empty() || messages.count(localReadMarker))
+ && std::all_of(reverseEventRelationships.begin(), reverseEventRelationships.end(), [&inMessages](const auto &p) {
+ const auto &relTypeToEventsMap = p.second;
+ return std::all_of(relTypeToEventsMap.begin(), relTypeToEventsMap.end(), [&inMessages](const auto &p2) {
+ const auto &events = p2.second;
+ return immer::all_of(events, inMessages);
+ });
+ });
+ }
}
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -74,6 +74,7 @@
client/room/local-echo-test.cpp
client/room/event-relationships-test.cpp
client/room/member-membership-test.cpp
+ client/room/purge-test.cpp
client/push-rules-desc-test.cpp
client/notification-handler-test.cpp
client/validator-test.cpp
diff --git a/src/tests/client/action-mock-utils.hpp b/src/tests/client/action-mock-utils.hpp
--- a/src/tests/client/action-mock-utils.hpp
+++ b/src/tests/client/action-mock-utils.hpp
@@ -14,6 +14,11 @@
#include <zug/transducer/filter.hpp>
#include <store/store.hpp>
#include <client/client.hpp>
+#include <client/sdk.hpp>
+#include <cprjobhandler.hpp>
+#include <lagerstoreeventemitter.hpp>
+#include <asio-promise-handler.hpp>
+#include <lager/event_loop/boost_asio.hpp>
template<class Action>
struct PassDownTag
@@ -307,3 +312,48 @@
{
return ContextT(std::forward<Func>(func), ph, lager::deps<>{});
};
+
+struct MockSdkUtil
+{
+ using PH = Kazv::AsioPromiseHandler<boost::asio::io_context::executor_type>;
+ using ContextT = Kazv::Context<Kazv::SdkAction>;
+ using SdkT = decltype(Kazv::makeSdk(
+ Kazv::SdkModel{},
+ Kazv::detail::declref<Kazv::CprJobHandler>(),
+ Kazv::detail::declref<Kazv::LagerStoreEventEmitter>(),
+ Kazv::detail::declref<PH>(),
+ zug::identity));
+ MockSdkUtil(Kazv::ClientModel m)
+ : io()
+ , jh{io.get_executor()}
+ , ee(lager::with_boost_asio_event_loop{io.get_executor()})
+ , ph{io.get_executor()}
+ , sdk(Kazv::makeSdk(Kazv::SdkModel{m}, jh, ee, ph, zug::identity))
+ , ctx(sdk.context())
+ {}
+
+ template<class ...Handlers>
+ auto getMockDispatcher(Handlers &&...handlers)
+ {
+ return ::getMockDispatcher(ph, ctx, std::forward<Handlers>(handlers)...);
+ }
+
+ template<class MD>
+ auto getClient(MD &d)
+ {
+ auto mockContext = getMockContext(ph, d);
+ return Kazv::Client(Kazv::Client::InEventLoopTag{}, mockContext, sdk.context());
+ }
+
+ boost::asio::io_context io;
+ Kazv::CprJobHandler jh;
+ Kazv::LagerStoreEventEmitter ee;
+ PH ph;
+ SdkT sdk;
+ ContextT ctx;
+};
+
+MockSdkUtil makeMockSdkUtil(Kazv::ClientModel m)
+{
+ return MockSdkUtil(m);
+}
diff --git a/src/tests/client/room/purge-test.cpp b/src/tests/client/room/purge-test.cpp
new file mode 100644
--- /dev/null
+++ b/src/tests/client/room/purge-test.cpp
@@ -0,0 +1,169 @@
+/*
+ * This file is part of libkazv.
+ * SPDX-FileCopyrightText: 2025 tusooa <tusooa@kazv.moe>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+#include <libkazv-config.hpp>
+#include "client-test-util.hpp"
+#include "action-mock-utils.hpp"
+#include "factory.hpp"
+#include <client.hpp>
+#include <catch2/catch_test_macros.hpp>
+#include <zug/transducer/repeat.hpp>
+#include <zug/transducer/enumerate.hpp>
+
+using namespace Kazv;
+using namespace Kazv::Factory;
+
+inline static Event makeEncryptedEvent(std::string sessionId = "xxx"s)
+{
+ return makeEvent(
+ withEventType("m.room.encrypted")
+ | withEventContent(json{
+ {"algorithm", "m.megolm.v1.aes-sha2"},
+ {"sender_key", "yyy"},
+ {"device_id", "device1"},
+ {"session_id", sessionId},
+ {"ciphertext", "AAA"},
+ })
+ );
+}
+
+TEST_CASE("PurgeEventsAction", "[client][room][purge]")
+{
+ WHEN("simple purging") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ }
+
+ WHEN("nothing to purge") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ auto next = RoomModel::update(r, PurgeEventsAction{120});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 100);
+ }
+
+ WHEN("rewrites undecryptedEvents") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::enumerate
+ | zug::map([](const auto &i, const auto &) {
+ return i % 2
+ ? makeEvent()
+ : makeEncryptedEvent("sess" + std::to_string(i));
+ })
+ );
+ RoomModel r = makeRoom(withRoomEncrypted(true) | withRoomTimeline(events));
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ REQUIRE(next.undecryptedEvents.size() == 10);
+ }
+
+ WHEN("keep unreadNotificationEventIds") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ r.unreadNotificationEventIds = {
+ events[0].id(),
+ events[25].id(),
+ events[70].id(),
+ events[81].id(),
+ events[95].id(),
+ };
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ REQUIRE(next.messages.size() == 23);
+ REQUIRE(next.messages.count(events[0].id()));
+ REQUIRE(next.messages.count(events[25].id()));
+ REQUIRE(next.messages.count(events[70].id()));
+ }
+
+ WHEN("keep localReadMarker") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ r.localReadMarker = events[70].id();
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ REQUIRE(next.messages.size() == 21);
+ REQUIRE(next.messages.count(events[70].id()));
+ }
+
+ WHEN("rewrites relationships") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ events = events.set(10, makeEvent(withEventId(events[10].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[9].id())));
+ events = events.set(26, makeEvent(withEventId(events[26].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[8].id())));
+ events = events.set(85, makeEvent(withEventId(events[85].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[9].id())));
+ events = events.set(88, makeEvent(withEventId(events[88].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[20].id())));
+ events = events.set(90, makeEvent(withEventId(events[90].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[9].id())));
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ r.localReadMarker = events[70].id();
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ REQUIRE(next.messages.size() == 21);
+ REQUIRE(next.messages.count(events[70].id()));
+ auto expected = RoomModel::ReverseEventRelationshipMap{
+ {events[9].id(), {{"moe.kazv.mxc.some-type", {events[85].id(), events[90].id()}}}},
+ {events[20].id(), {{"moe.kazv.mxc.some-type", {events[88].id()}}}},
+ };
+ REQUIRE(next.reverseEventRelationships == expected);
+ }
+}
+
+TEST_CASE("PurgeRoomTimelineAction", "[client][room][purge]")
+{
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ auto r1 = makeRoom(withRoomTimeline(events));
+ auto r2 = makeRoom(withRoomTimeline(events));
+ auto r3 = makeRoom(withRoomTimeline(events));
+
+ auto m = makeClient(withRoom(r1) | withRoom(r2) | withRoom(r3));
+ auto u = makeMockSdkUtil(m);
+ auto d = u.getMockDispatcher(passDown<PurgeRoomTimelineAction>());
+ auto c = u.getClient(d);
+
+ c.purgeRoomEvents({
+ {r1.roomId, 80},
+ {r2.roomId, 50},
+ }).then([&u](const auto &stat) {
+ REQUIRE(stat);
+ u.io.stop();
+ });
+
+ u.io.run();
+ REQUIRE(c.room(r1.roomId).timelineEventIds().get().size() == 80);
+ REQUIRE(c.room(r2.roomId).timelineEventIds().get().size() == 50);
+ REQUIRE(c.room(r3.roomId).timelineEventIds().get().size() == 100);
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Oct 17, 1:49 AM (6 h, 22 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
522052
Default Alt Text
D237.1760690994.diff (17 KB)
Attached To
Mode
D237: Add purging event support
Attached
Detach File
Event Timeline
Log In to Comment