Page MenuHomePhorge

D237.1760702046.diff
No OneTemporary

Size
17 KB
Referenced Files
None
Subscribers
None

D237.1760702046.diff

diff --git a/src/client/actions/storage.hpp b/src/client/actions/storage.hpp
--- a/src/client/actions/storage.hpp
+++ b/src/client/actions/storage.hpp
@@ -11,4 +11,5 @@
namespace Kazv
{
[[nodiscard]] ClientResult updateClient(ClientModel m, LoadEventsFromStorageAction a);
+ [[nodiscard]] ClientResult updateClient(ClientModel m, PurgeRoomTimelineAction a);
}
diff --git a/src/client/actions/storage.cpp b/src/client/actions/storage.cpp
--- a/src/client/actions/storage.cpp
+++ b/src/client/actions/storage.cpp
@@ -41,4 +41,15 @@
return { m, lager::noop };
}
+
+ ClientResult updateClient(ClientModel m, PurgeRoomTimelineAction a)
+ {
+ for (const auto &[roomId, maxToKeep] : a.roomIdToMaxToKeepMap) {
+ m.roomList = RoomListModel::update(
+ std::move(m.roomList),
+ UpdateRoomAction{roomId, PurgeEventsAction{maxToKeep}}
+ );
+ }
+ return { m, lager::noop };
+ }
}
diff --git a/src/client/client-model.hpp b/src/client/client-model.hpp
--- a/src/client/client-model.hpp
+++ b/src/client/client-model.hpp
@@ -594,6 +594,14 @@
immer::map<std::string, EventList> relatedEvents;
};
+ /// Remove events from the model, keeping only the latest `maxToKeep` events.
+ /// For each room, this takes O(maxToKeep * log(maxToKeep)) time.
+ struct PurgeRoomTimelineAction
+ {
+ /// A map from roomId to maxToKeep
+ immer::map<std::string, std::size_t> roomIdToMaxToKeepMap;
+ };
+
template<class Archive>
void serialize(Archive &ar, ClientModel &m, std::uint32_t const version)
{
diff --git a/src/client/client.hpp b/src/client/client.hpp
--- a/src/client/client.hpp
+++ b/src/client/client.hpp
@@ -603,6 +603,18 @@
*/
BaseJob getRoomIdByAliasJob(std::string roomAlias) const;
+ /**
+ * Purge events in room, keeping the latest `numToKeep` events.
+ *
+ * The events are removed from the lager store. The timeline will
+ * contain at most `numToKeep` events, but the `messages` property
+ * may contain more in order to maintain the room invariants.
+ * @sa RoomModel
+ *
+ * @param roomIdToMaxToKeepMap A map from "room id" to "max number of timeline events to keep."
+ */
+ PromiseT purgeRoomEvents(immer::map<std::string, std::size_t> roomIdToMaxToKeepMap) const;
+
private:
void syncForever(std::optional<int> retryTime = std::nullopt) const;
diff --git a/src/client/client.cpp b/src/client/client.cpp
--- a/src/client/client.cpp
+++ b/src/client/client.cpp
@@ -500,4 +500,9 @@
{
return Kazv::getRoomIdByAliasJob(clientCursor().get(), roomAlias);
}
+
+ auto Client::purgeRoomEvents(immer::map<std::string, std::size_t> roomIdToMaxToKeepMap) const -> PromiseT
+ {
+ return m_ctx.dispatch(PurgeRoomTimelineAction{roomIdToMaxToKeepMap});
+ }
}
diff --git a/src/client/clientfwd.hpp b/src/client/clientfwd.hpp
--- a/src/client/clientfwd.hpp
+++ b/src/client/clientfwd.hpp
@@ -79,6 +79,7 @@
struct ResubmitJobAction;
struct LoadEventsFromStorageAction;
+ struct PurgeRoomTimelineAction;
struct ClientModel;
@@ -145,7 +146,8 @@
ResubmitJobAction,
- LoadEventsFromStorageAction
+ LoadEventsFromStorageAction,
+ PurgeRoomTimelineAction
>;
using ClientEffect = Effect<ClientAction, lager::deps<>>;
diff --git a/src/client/room/room-model.hpp b/src/client/room/room-model.hpp
--- a/src/client/room/room-model.hpp
+++ b/src/client/room/room-model.hpp
@@ -180,6 +180,12 @@
std::string myUserId;
};
+ /// Remove events from the model, retaining only the latest `maxToKeep` events.
+ struct PurgeEventsAction
+ {
+ std::size_t maxToKeep;
+ };
+
inline bool operator==(const PendingRoomKeyEvent &a, const PendingRoomKeyEvent &b)
{
return a.txnId == b.txnId && a.messages == b.messages;
@@ -220,6 +226,16 @@
*/
auto sortKeyForTimelineEvent(Event e) -> std::tuple<Timestamp, std::string>;
+ /**
+ * The model to store information about a room.
+ *
+ * Room invariants:
+ * Any event in timeline is in messages.
+ * Any event in undecryptedEvents is in messages.
+ * Any event in unreadNotificationEventIds is in messages.
+ * Any relater (i.e. child) event in reverseEventRelationships is in messages.
+ * localReadMarker (if not empty) is in messages.
+ */
struct RoomModel
{
using Membership = RoomMembership;
@@ -335,6 +351,12 @@
void recalculateUndecryptedEvents();
+ /**
+ * Check if the invariants in the model are satisfied.
+ * @return true iff the invariants are satisfied.
+ */
+ bool checkInvariants() const;
+
using Action = std::variant<
AddStateEventsAction,
MaybeAddStateEventsAction,
@@ -356,7 +378,8 @@
UpdateInvitedMemberCountAction,
AddLocalNotificationsAction,
RemoveReadLocalNotificationsAction,
- UpdateLocalReadMarkerAction
+ UpdateLocalReadMarkerAction,
+ PurgeEventsAction
>;
static RoomModel update(RoomModel r, Action a);
diff --git a/src/client/room/room-model.cpp b/src/client/room/room-model.cpp
--- a/src/client/room/room-model.cpp
+++ b/src/client/room/room-model.cpp
@@ -417,6 +417,44 @@
r.localReadMarker = a.localReadMarker;
auto next = RoomModel::update(std::move(r), RemoveReadLocalNotificationsAction{a.myUserId});
return next;
+ },
+ [&](PurgeEventsAction a) {
+ if (r.timeline.size() <= a.maxToKeep) {
+ return r;
+ }
+ auto numToDrop = r.timeline.size() - a.maxToKeep;
+ auto keepEvents = intoImmer(
+ EventList{},
+ zug::map([&r](const auto &eventId) {
+ return r.messages[eventId];
+ }),
+ std::move(r.timeline).drop(numToDrop)
+ );
+ auto origMessages = r.messages;
+ r.reverseEventRelationships = {};
+ r.timeline = {};
+ r.messages = {};
+ r.undecryptedEvents = {};
+
+ auto msgs = intoImmer(
+ r.localReadMarker.empty() ? EventList{} : EventList{origMessages[r.localReadMarker]},
+ zug::filter([l=r.localReadMarker](const auto &eventId) {
+ return eventId != l;
+ })
+ | zug::map([&origMessages](const auto &eventId) {
+ return origMessages[eventId];
+ }),
+ r.unreadNotificationEventIds
+ );
+
+ auto next = update(std::move(r), AddMessagesAction{msgs});
+
+ return update(std::move(next), AddToTimelineAction{
+ keepEvents,
+ std::nullopt,
+ std::nullopt,
+ std::nullopt,
+ });
}
);
}
@@ -709,4 +747,24 @@
return kv.second;
}), messages));
}
+
+ bool RoomModel::checkInvariants() const
+ {
+ auto inMessages = [this](const std::string &eventId) {
+ return !!messages.count(eventId);
+ };
+ return immer::all_of(timeline, inMessages)
+ && immer::all_of(unreadNotificationEventIds, inMessages)
+ && std::all_of(undecryptedEvents.begin(), undecryptedEvents.end(), [&inMessages](const auto &p) {
+ return immer::all_of(p.second, inMessages);
+ })
+ && (localReadMarker.empty() || messages.count(localReadMarker))
+ && std::all_of(reverseEventRelationships.begin(), reverseEventRelationships.end(), [&inMessages](const auto &p) {
+ const auto &relTypeToEventsMap = p.second;
+ return std::all_of(relTypeToEventsMap.begin(), relTypeToEventsMap.end(), [&inMessages](const auto &p2) {
+ const auto &events = p2.second;
+ return immer::all_of(events, inMessages);
+ });
+ });
+ }
}
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -74,6 +74,7 @@
client/room/local-echo-test.cpp
client/room/event-relationships-test.cpp
client/room/member-membership-test.cpp
+ client/room/purge-test.cpp
client/push-rules-desc-test.cpp
client/notification-handler-test.cpp
client/validator-test.cpp
diff --git a/src/tests/client/action-mock-utils.hpp b/src/tests/client/action-mock-utils.hpp
--- a/src/tests/client/action-mock-utils.hpp
+++ b/src/tests/client/action-mock-utils.hpp
@@ -14,6 +14,11 @@
#include <zug/transducer/filter.hpp>
#include <store/store.hpp>
#include <client/client.hpp>
+#include <client/sdk.hpp>
+#include <cprjobhandler.hpp>
+#include <lagerstoreeventemitter.hpp>
+#include <asio-promise-handler.hpp>
+#include <lager/event_loop/boost_asio.hpp>
template<class Action>
struct PassDownTag
@@ -307,3 +312,48 @@
{
return ContextT(std::forward<Func>(func), ph, lager::deps<>{});
};
+
+struct MockSdkUtil
+{
+ using PH = Kazv::AsioPromiseHandler<boost::asio::io_context::executor_type>;
+ using ContextT = Kazv::Context<Kazv::SdkAction>;
+ using SdkT = decltype(Kazv::makeSdk(
+ Kazv::SdkModel{},
+ Kazv::detail::declref<Kazv::CprJobHandler>(),
+ Kazv::detail::declref<Kazv::LagerStoreEventEmitter>(),
+ Kazv::detail::declref<PH>(),
+ zug::identity));
+ MockSdkUtil(Kazv::ClientModel m)
+ : io()
+ , jh{io.get_executor()}
+ , ee(lager::with_boost_asio_event_loop{io.get_executor()})
+ , ph{io.get_executor()}
+ , sdk(Kazv::makeSdk(Kazv::SdkModel{m}, jh, ee, ph, zug::identity))
+ , ctx(sdk.context())
+ {}
+
+ template<class ...Handlers>
+ auto getMockDispatcher(Handlers &&...handlers)
+ {
+ return ::getMockDispatcher(ph, ctx, std::forward<Handlers>(handlers)...);
+ }
+
+ template<class MD>
+ auto getClient(MD &d)
+ {
+ auto mockContext = getMockContext(ph, d);
+ return Kazv::Client(Kazv::Client::InEventLoopTag{}, mockContext, sdk.context());
+ }
+
+ boost::asio::io_context io;
+ Kazv::CprJobHandler jh;
+ Kazv::LagerStoreEventEmitter ee;
+ PH ph;
+ SdkT sdk;
+ ContextT ctx;
+};
+
+MockSdkUtil makeMockSdkUtil(Kazv::ClientModel m)
+{
+ return MockSdkUtil(m);
+}
diff --git a/src/tests/client/room/purge-test.cpp b/src/tests/client/room/purge-test.cpp
new file mode 100644
--- /dev/null
+++ b/src/tests/client/room/purge-test.cpp
@@ -0,0 +1,169 @@
+/*
+ * This file is part of libkazv.
+ * SPDX-FileCopyrightText: 2025 tusooa <tusooa@kazv.moe>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+#include <libkazv-config.hpp>
+#include "client-test-util.hpp"
+#include "action-mock-utils.hpp"
+#include "factory.hpp"
+#include <client.hpp>
+#include <catch2/catch_test_macros.hpp>
+#include <zug/transducer/repeat.hpp>
+#include <zug/transducer/enumerate.hpp>
+
+using namespace Kazv;
+using namespace Kazv::Factory;
+
+inline static Event makeEncryptedEvent(std::string sessionId = "xxx"s)
+{
+ return makeEvent(
+ withEventType("m.room.encrypted")
+ | withEventContent(json{
+ {"algorithm", "m.megolm.v1.aes-sha2"},
+ {"sender_key", "yyy"},
+ {"device_id", "device1"},
+ {"session_id", sessionId},
+ {"ciphertext", "AAA"},
+ })
+ );
+}
+
+TEST_CASE("PurgeEventsAction", "[client][room][purge]")
+{
+ WHEN("simple purging") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ }
+
+ WHEN("nothing to purge") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ auto next = RoomModel::update(r, PurgeEventsAction{120});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 100);
+ }
+
+ WHEN("rewrites undecryptedEvents") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::enumerate
+ | zug::map([](const auto &i, const auto &) {
+ return i % 2
+ ? makeEvent()
+ : makeEncryptedEvent("sess" + std::to_string(i));
+ })
+ );
+ RoomModel r = makeRoom(withRoomEncrypted(true) | withRoomTimeline(events));
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ REQUIRE(next.undecryptedEvents.size() == 10);
+ }
+
+ WHEN("keep unreadNotificationEventIds") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ r.unreadNotificationEventIds = {
+ events[0].id(),
+ events[25].id(),
+ events[70].id(),
+ events[81].id(),
+ events[95].id(),
+ };
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ REQUIRE(next.messages.size() == 23);
+ REQUIRE(next.messages.count(events[0].id()));
+ REQUIRE(next.messages.count(events[25].id()));
+ REQUIRE(next.messages.count(events[70].id()));
+ }
+
+ WHEN("keep localReadMarker") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ r.localReadMarker = events[70].id();
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ REQUIRE(next.messages.size() == 21);
+ REQUIRE(next.messages.count(events[70].id()));
+ }
+
+ WHEN("rewrites relationships") {
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ events = events.set(10, makeEvent(withEventId(events[10].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[9].id())));
+ events = events.set(26, makeEvent(withEventId(events[26].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[8].id())));
+ events = events.set(85, makeEvent(withEventId(events[85].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[9].id())));
+ events = events.set(88, makeEvent(withEventId(events[88].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[20].id())));
+ events = events.set(90, makeEvent(withEventId(events[90].id()) | withEventRelationship("moe.kazv.mxc.some-type", events[9].id())));
+ RoomModel r = makeRoom(withRoomTimeline(events));
+ r.localReadMarker = events[70].id();
+ auto next = RoomModel::update(r, PurgeEventsAction{20});
+ REQUIRE(next.checkInvariants());
+ REQUIRE(next.timeline.size() == 20);
+ REQUIRE(next.messages.size() == 21);
+ REQUIRE(next.messages.count(events[70].id()));
+ auto expected = RoomModel::ReverseEventRelationshipMap{
+ {events[9].id(), {{"moe.kazv.mxc.some-type", {events[85].id(), events[90].id()}}}},
+ {events[20].id(), {{"moe.kazv.mxc.some-type", {events[88].id()}}}},
+ };
+ REQUIRE(next.reverseEventRelationships == expected);
+ }
+}
+
+TEST_CASE("PurgeRoomTimelineAction", "[client][room][purge]")
+{
+ auto events = intoImmer(
+ EventList{},
+ zug::repeatn(100, 0)
+ | zug::map([](const auto &) { return makeEvent(); })
+ );
+ auto r1 = makeRoom(withRoomTimeline(events));
+ auto r2 = makeRoom(withRoomTimeline(events));
+ auto r3 = makeRoom(withRoomTimeline(events));
+
+ auto m = makeClient(withRoom(r1) | withRoom(r2) | withRoom(r3));
+ auto u = makeMockSdkUtil(m);
+ auto d = u.getMockDispatcher(passDown<PurgeRoomTimelineAction>());
+ auto c = u.getClient(d);
+
+ c.purgeRoomEvents({
+ {r1.roomId, 80},
+ {r2.roomId, 50},
+ }).then([&u](const auto &stat) {
+ REQUIRE(stat);
+ u.io.stop();
+ });
+
+ u.io.run();
+ REQUIRE(c.room(r1.roomId).timelineEventIds().get().size() == 80);
+ REQUIRE(c.room(r2.roomId).timelineEventIds().get().size() == 50);
+ REQUIRE(c.room(r3.roomId).timelineEventIds().get().size() == 100);
+}

File Metadata

Mime Type
text/plain
Expires
Fri, Oct 17, 4:54 AM (9 h, 26 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
522052
Default Alt Text
D237.1760702046.diff (17 KB)

Event Timeline