Page MenuHomePhorge

D270.1769261648.diff
No OneTemporary

Size
15 KB
Referenced Files
None
Subscribers
None

D270.1769261648.diff

diff --git a/src/db-store.hpp b/src/db-store.hpp
--- a/src/db-store.hpp
+++ b/src/db-store.hpp
@@ -13,6 +13,7 @@
#include <QSqlDatabase>
#include <QCoroTask>
#include <sdk-model.hpp>
+#include <kazvevents.hpp>
/**
* This implements the event storage system using a relational database.
@@ -28,6 +29,7 @@
static inline constexpr std::string dbDirName{"database"};
static inline constexpr std::string dbBaseName{"db.sqlite3"};
static QString getHandleFor(std::string userId, std::string deviceId);
+ static std::optional<Kazv::Event> resultToEvent(bool encrypted, bool decrypted, QString originalJson, QString decryptedJson);
DbStore();
~DbStore();
@@ -50,6 +52,22 @@
*/
QCoro::Task<std::pair<bool, QString>> importAllFrom(const Kazv::SdkModel &model);
+ /**
+ * Save events from a SaveEventsRequested trigger.
+ *
+ * @param s The SaveEventsRequested trigger.
+ */
+ QCoro::Task<std::pair<bool, QString>> saveEvents(Kazv::SaveEventsRequested s);
+
+ /**
+ * Get one event from the database.
+ *
+ * @param roomId The room id of the event.
+ * @param eventId The id of the event.
+ * @return A pair of (Event, isInTimeline) associated with the ids or std::nullopt if it is not found.
+ */
+ QCoro::Task<std::optional<std::pair<Kazv::Event, bool>>> getEventById(const QString &roomId, const QString &eventId);
+
private:
void cleanup();
QCoro::Task<std::pair<bool, QString>> migrate();
diff --git a/src/db-store.cpp b/src/db-store.cpp
--- a/src/db-store.cpp
+++ b/src/db-store.cpp
@@ -11,6 +11,7 @@
#include <QPromise>
#include <QSqlError>
#include <QSqlQuery>
+#include <QSqlRecord>
#include <QCoroFuture>
using namespace Qt::Literals::StringLiterals;
@@ -21,6 +22,29 @@
return QString::fromStdString(userId + "/" + deviceId);
}
+std::optional<Kazv::Event> DbStore::resultToEvent(bool encrypted, bool decrypted, QString originalJson, QString decryptedJson)
+{
+ Event e;
+ try {
+ auto oj = json::parse(std::move(originalJson).toStdString());
+ e = Event(JsonWrap(oj));
+ } catch (const json::parse_error &) {
+ return std::nullopt;
+ }
+ try {
+ if (encrypted) {
+ auto dj = json::parse(std::move(decryptedJson).toStdString());
+ e = std::move(e).setDecryptedJson(
+ JsonWrap(dj),
+ decrypted ? Event::Decrypted : Event::NotDecrypted
+ );
+ }
+ return e;
+ } catch (const json::parse_error &) {
+ return e;
+ }
+}
+
DbStore::DbStore()
: m_thread(new QThread)
, m_obj(new QObject)
@@ -85,6 +109,74 @@
co_return {true, QString()};
}
+QCoro::Task<std::pair<bool, QString>> DbStore::saveEvents(Kazv::SaveEventsRequested s)
+{
+ for (const auto &[roomId, el] : s.timelineEvents) {
+ auto roomIdQs = QString::fromStdString(roomId);
+ for (const auto &event : el) {
+ auto res = co_await importOne(roomIdQs, event, /* isInTimeline = */ true);
+ if (!res.first) {
+ qCWarning(kazvLog) << "Error inserting value:" << res.second;
+ co_return res;
+ }
+ }
+ }
+
+ for (const auto &[roomId, el] : s.nonTimelineEvents) {
+ auto roomIdQs = QString::fromStdString(roomId);
+ for (const auto &event : el) {
+ auto res = co_await importOne(roomIdQs, event, /* isInTimeline = */ false);
+ if (!res.first) {
+ qCWarning(kazvLog) << "Error inserting value:" << res.second;
+ co_return res;
+ }
+ }
+ }
+ co_return {true, QString()};
+}
+
+inline const auto getEventByIdQuery = uR"xxx(SELECT
+ encrypted, decrypted,
+ original_json, decrypted_json, is_in_timeline
+FROM events WHERE
+ room_id = :room_id AND event_id = :event_id
+;)xxx"_s;
+QCoro::Task<std::optional<std::pair<Kazv::Event, bool>>> DbStore::getEventById(const QString &roomId, const QString &eventId)
+{
+ QPromise<std::optional<std::pair<Kazv::Event, bool>>> p;
+ auto fut = p.future();
+ QMetaObject::invokeMethod(m_obj, [this, p=std::move(p), roomId, eventId]() mutable {
+ QSqlQuery q(m_d.value());
+ auto res = q.prepare(getEventByIdQuery);
+ if (!res) {
+ p.addResult(std::nullopt);
+ p.finish();
+ return;
+ }
+ q.bindValue(u":room_id"_s, roomId);
+ q.bindValue(u":event_id"_s, eventId);
+ res = q.exec();
+ if (!res || !q.next()) {
+ p.addResult(std::nullopt);
+ p.finish();
+ return;
+ }
+ auto encrypted = q.value(0).toBool();
+ auto decrypted = q.value(1).toBool();
+ auto originalJson = q.value(2).toString();
+ auto decryptedJson = q.value(3).toString();
+ auto isInTimeline = q.value(4).toBool();
+ auto eventOpt = resultToEvent(encrypted, decrypted, std::move(originalJson), std::move(decryptedJson));
+ auto ret = eventOpt
+ ? std::optional<std::pair<Event, bool>>(
+ {eventOpt.value(), isInTimeline}
+ ) : std::nullopt;
+ p.addResult(ret);
+ p.finish();
+ });
+ co_return co_await fut;
+}
+
bool DbStore::valid() const
{
return m_d.has_value();
@@ -167,27 +259,23 @@
// We invoke the query inside the thread of the database, then use the
// promise-future protocol to pass the result back as a coroutine.
QPromise<std::pair<bool, QString>> p;
- qCDebug(kazvLog) << "DbStore: Async Query:" << qs;
auto fut = p.future();
QMetaObject::invokeMethod(m_obj, [this, p=std::move(p), s=std::move(qs), tf]() mutable {
p.start();
QSqlQuery q(m_d.value());
- qCDebug(kazvLog) << "DbStore: Invoking Query:" << s;
auto res = q.prepare(std::move(s));
if (!res) {
- qCDebug(kazvLog) << "DbStore: Async Query Prepare Failed: Driver:" << q.lastError().driverText();
- qCDebug(kazvLog) << "DbStore: Async Query Prepare Failed: Db:" << q.lastError().databaseText();
+ qCWarning(kazvLog) << "DbStore: Async Query Prepare Failed: Driver:" << q.lastError().driverText();
+ qCWarning(kazvLog) << "DbStore: Async Query Prepare Failed: Db:" << q.lastError().databaseText();
p.addResult({false, q.lastError().text()});
p.finish();
return;
}
tf(q);
- qCDebug(kazvLog) << "DbStore: where:" << q.boundValueNames() << q.boundValues();
res = q.exec();
- qCDebug(kazvLog) << "DbStore: Async Query Done:" << res;
if (!res) {
- qCDebug(kazvLog) << "DbStore: Async Query Failed: Driver:" << q.lastError().driverText();
- qCDebug(kazvLog) << "DbStore: Async Query Failed: Db:" << q.lastError().databaseText();
+ qCWarning(kazvLog) << "DbStore: Async Query Failed: Driver:" << q.lastError().driverText();
+ qCWarning(kazvLog) << "DbStore: Async Query Failed: Db:" << q.lastError().databaseText();
p.addResult({false, q.lastError().text()});
} else {
p.addResult({true, QString()});
diff --git a/src/matrix-sdk.hpp b/src/matrix-sdk.hpp
--- a/src/matrix-sdk.hpp
+++ b/src/matrix-sdk.hpp
@@ -138,6 +138,7 @@
void sessionChanged();
void loadSessionFinished(QString sessionName, MatrixSdk::LoadSessionResult result);
+ void dbResolved(bool success);
public Q_SLOTS:
void login(const QString &userId, const QString &password, const QString &homeserverUrl);
diff --git a/src/matrix-sdk.cpp b/src/matrix-sdk.cpp
--- a/src/matrix-sdk.cpp
+++ b/src/matrix-sdk.cpp
@@ -84,6 +84,19 @@
void resume() { throw std::runtime_error{"not implemented!"}; }
};
+QCoro::Task<std::pair<bool, QString>> setupAndImport(SdkModel model, DbStore *dbStore, std::string userDataDir)
+{
+ auto res = co_await dbStore->setup(
+ userDataDir,
+ model.c().userId,
+ model.c().deviceId
+ );
+ if (!res.first) {
+ co_return res;
+ }
+ co_return co_await dbStore->importAllFrom(model);
+}
+
std::filesystem::path sessionDirForUserAndDeviceId(std::filesystem::path userDataDir, std::string userId, std::string deviceId)
{
auto encodedUserId = encodeBase64(userId, Base64Opts::urlSafe);
@@ -92,6 +105,35 @@
return sessionDir;
}
+struct PendingSaveEvents : public SaveEventsRequested
+{
+ using DataT = immer::map<std::string, EventList>;
+ void add(SaveEventsRequested s)
+ {
+ timelineEvents = addOneType(std::move(timelineEvents), s.timelineEvents);
+ nonTimelineEvents = addOneType(std::move(nonTimelineEvents), s.nonTimelineEvents);
+ }
+
+ static DataT addOneType(DataT orig, DataT addon)
+ {
+ for (const auto &[roomId, el]: addon) {
+ orig = std::move(orig).update(roomId, [&el](EventList origEl) {
+ // This works without explicit dedupe because we do not do parallel
+ // inserts and the insert order is the same as list order.
+ return origEl + el;
+ });
+ }
+ return orig;
+ }
+};
+
+enum DbStatus
+{
+ Pending,
+ Ready,
+ Failed,
+};
+
struct MatrixSdkPrivate
{
MatrixSdkPrivate(MatrixSdk *q, bool testing, std::unique_ptr<KazvSessionLockGuard> lockGuard, std::unique_ptr<DbStore> dbStore);
@@ -113,6 +155,9 @@
SecondaryRootT secondaryRoot;
Client clientOnSecondaryRoot;
NotificationHandler notificationHandler;
+ DbStatus dbStatus;
+ PendingSaveEvents pendingSaveEvents{};
+
void runIoContext() {
thread->start();
}
@@ -128,6 +173,26 @@
}
void serializeClientToFile(Client c);
+
+ void saveOrQueueEvents(SaveEventsRequested s)
+ {
+ if (dbStatus == Ready) {
+ saveEvents(std::move(s));
+ } else if (dbStatus == Pending) {
+ pendingSaveEvents.add(std::move(s));
+ }
+ // Database open failed, do not do anything
+ }
+
+ void saveEvents(SaveEventsRequested s)
+ {
+ dbStore->saveEvents(std::move(s))
+ .then([](const auto &stat) {
+ if (!stat.first) {
+ qCWarning(kazvLog) << "Cannot save events:" << stat.second;
+ }
+ });
+ }
};
// Cleaning up notes:
@@ -260,6 +325,7 @@
, secondaryRoot(sdk.createSecondaryRoot(QtEventLoop{q}))
, clientOnSecondaryRoot(sdk.clientFromSecondaryRoot(secondaryRoot))
, notificationHandler(clientOnSecondaryRoot.notificationHandler())
+ , dbStatus(this->dbStore ? Ready : Pending)
{
obj->moveToThread(thread);
}
@@ -285,6 +351,7 @@
, secondaryRoot(sdk.createSecondaryRoot(QtEventLoop{q}, std::move(model)))
, clientOnSecondaryRoot(sdk.clientFromSecondaryRoot(secondaryRoot))
, notificationHandler(clientOnSecondaryRoot.notificationHandler())
+ , dbStatus(this->dbStore ? Ready : Pending)
{
obj->moveToThread(thread);
}
@@ -295,12 +362,43 @@
{
init();
connect(this, &MatrixSdk::trigger,
- this, [](KazvEvent e) {
- qDebug() << "receiving trigger:";
- if (std::holds_alternative<LoginSuccessful>(e)) {
- qDebug() << "Login successful";
- }
- });
+ this, [this](KazvEvent e) {
+ if (std::holds_alternative<SaveEventsRequested>(e)) {
+ auto s = std::get<SaveEventsRequested>(e);
+ m_d->saveOrQueueEvents(s);
+ }
+ });
+
+ // loginSuccessful will be emitted only for new sessions (not for loaded
+ // sessions),
+ connect(this, &MatrixSdk::loginSuccessful, this, [this]() {
+ if (m_d->dbStore) {
+ qCWarning(kazvLog) << "Trying to replace current db store, ignoring";
+ return;
+ }
+ m_d->dbStore = std::make_unique<DbStore>();
+ // This must be called from the GUI thread
+ auto model = m_d->secondaryRoot.get();
+ setupAndImport(std::move(model), m_d->dbStore.get(), m_d->userDataDir)
+ .then([this](const auto &stat) {
+ if (!stat.first) {
+ qCWarning(kazvLog) << "Cannot set up database:" << stat.second;
+ }
+ Q_EMIT dbResolved(stat.first);
+ });
+ });
+
+ connect(this, &MatrixSdk::dbResolved, this, [this](bool success) {
+ if (!success) {
+ m_d->dbStatus = Failed;
+ } else {
+ qCDebug(kazvLog) << "db loading succeeded";
+ auto pendingSaveEvents = std::move(m_d->pendingSaveEvents);
+ m_d->pendingSaveEvents = {};
+ m_d->saveEvents(std::move(pendingSaveEvents));
+ m_d->dbStatus = Ready;
+ }
+ });
}
void MatrixSdk::init()
@@ -692,18 +790,7 @@
}
auto dbStore = std::make_unique<DbStore>();
- auto stat = QCoro::waitFor([](SdkModel model, DbStore *dbStore, std::string userDataDir) -> QCoro::Task<std::pair<bool, QString>> {
- qCDebug(kazvLog) << "open db:" << userDataDir << model.c().userId << model.c().deviceId;
- auto res = co_await dbStore->setup(
- userDataDir,
- model.c().userId,
- model.c().deviceId
- );
- if (!res.first) {
- co_return res;
- }
- co_return co_await dbStore->importAllFrom(model);
- }(model, dbStore.get(), m_d->userDataDir));
+ auto stat = QCoro::waitFor(setupAndImport(model, dbStore.get(), m_d->userDataDir));
if (!stat.first) {
qCWarning(kazvLog) << "Unable to open db store:" << stat.second;
Q_EMIT loadSessionFinished(sessionName, SessionDeserializeFailed);
diff --git a/src/tests/db-store-test.cpp b/src/tests/db-store-test.cpp
--- a/src/tests/db-store-test.cpp
+++ b/src/tests/db-store-test.cpp
@@ -13,6 +13,7 @@
using namespace Kazv::Factory;
using namespace Kazv;
+using namespace Qt::Literals::StringLiterals;
class DbStoreTest : public QObject
{
@@ -22,6 +23,8 @@
void init();
void testSetup();
void testImportAllFrom();
+ void testResultToEvent();
+ void testSaveEvents();
void cleanup();
private:
@@ -63,5 +66,53 @@
QVERIFY(stat.first);
}
+void DbStoreTest::testResultToEvent()
+{
+ {
+ Event e = makeEvent(withEventType("m.room.encrypted"))
+ .setDecryptedJson(json{{"content", {{"text", "mew"}}}}, Event::Decrypted);
+ QVERIFY(e == DbStore::resultToEvent(
+ e.encrypted(),
+ e.decrypted(),
+ QString::fromStdString(e.originalJson().get().dump()),
+ QString::fromStdString(e.decryptedJson().get().dump())
+ ));
+ }
+ {
+ Event e = makeEvent();
+ QVERIFY(e == DbStore::resultToEvent(
+ e.encrypted(),
+ e.decrypted(),
+ QString::fromStdString(e.originalJson().get().dump()),
+ QVariant().toString()
+ ));
+ }
+}
+
+void DbStoreTest::testSaveEvents()
+{
+ DbStore dbStore;
+ {
+ auto stat = QCoro::waitFor(dbStore.setup(m_tempDir.path().toStdString(), "@userid:example.com", "device1"));
+ QVERIFY(stat.first);
+ }
+ auto e1 = makeEvent();
+ auto e2 = makeEvent();
+ auto e3 = makeEvent();
+ SaveEventsRequested s{
+ {{"!room1:example.com", {e1, e2}}},
+ {{"!room2:example.com", {e3}}},
+ };
+ {
+ auto stat = QCoro::waitFor(dbStore.saveEvents(s));
+ QVERIFY(stat.first);
+ }
+ auto res = QCoro::waitFor(dbStore.getEventById(u"!room1:example.com"_s, QString::fromStdString(e1.id())));
+ QVERIFY(res.has_value() && res.value().first == e1 && res.value().second);
+
+ res = QCoro::waitFor(dbStore.getEventById(u"!room2:example.com"_s, QString::fromStdString(e3.id())));
+ QVERIFY(res.has_value() && res.value().first == e3 && !res.value().second);
+}
+
QTEST_MAIN(DbStoreTest)
#include "db-store-test.moc"

File Metadata

Mime Type
text/plain
Expires
Sat, Jan 24, 5:34 AM (20 h, 32 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1001468
Default Alt Text
D270.1769261648.diff (15 KB)

Event Timeline