Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F33929816
D270.1769258389.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Size
15 KB
Referenced Files
None
Subscribers
None
D270.1769258389.diff
View Options
diff --git a/src/db-store.hpp b/src/db-store.hpp
--- a/src/db-store.hpp
+++ b/src/db-store.hpp
@@ -13,6 +13,7 @@
#include <QSqlDatabase>
#include <QCoroTask>
#include <sdk-model.hpp>
+#include <kazvevents.hpp>
/**
* This implements the event storage system using a relational database.
@@ -28,6 +29,7 @@
static inline constexpr std::string dbDirName{"database"};
static inline constexpr std::string dbBaseName{"db.sqlite3"};
static QString getHandleFor(std::string userId, std::string deviceId);
+ static std::optional<Kazv::Event> resultToEvent(bool encrypted, bool decrypted, QString originalJson, QString decryptedJson);
DbStore();
~DbStore();
@@ -50,6 +52,22 @@
*/
QCoro::Task<std::pair<bool, QString>> importAllFrom(const Kazv::SdkModel &model);
+ /**
+ * Save events from a SaveEventsRequested trigger.
+ *
+ * @param s The SaveEventsRequested trigger.
+ */
+ QCoro::Task<std::pair<bool, QString>> saveEvents(Kazv::SaveEventsRequested s);
+
+ /**
+ * Get one event from the database.
+ *
+ * @param roomId The room id of the event.
+ * @param eventId The id of the event.
+ * @return A pair of (Event, isInTimeline) associated with the ids or std::nullopt if it is not found.
+ */
+ QCoro::Task<std::optional<std::pair<Kazv::Event, bool>>> getEventById(const QString &roomId, const QString &eventId);
+
private:
void cleanup();
QCoro::Task<std::pair<bool, QString>> migrate();
diff --git a/src/db-store.cpp b/src/db-store.cpp
--- a/src/db-store.cpp
+++ b/src/db-store.cpp
@@ -11,6 +11,7 @@
#include <QPromise>
#include <QSqlError>
#include <QSqlQuery>
+#include <QSqlRecord>
#include <QCoroFuture>
using namespace Qt::Literals::StringLiterals;
@@ -21,6 +22,29 @@
return QString::fromStdString(userId + "/" + deviceId);
}
+std::optional<Kazv::Event> DbStore::resultToEvent(bool encrypted, bool decrypted, QString originalJson, QString decryptedJson)
+{
+ Event e;
+ try {
+ auto oj = json::parse(std::move(originalJson).toStdString());
+ e = Event(JsonWrap(oj));
+ } catch (const json::parse_error &) {
+ return std::nullopt;
+ }
+ try {
+ if (encrypted) {
+ auto dj = json::parse(std::move(decryptedJson).toStdString());
+ e = std::move(e).setDecryptedJson(
+ JsonWrap(dj),
+ decrypted ? Event::Decrypted : Event::NotDecrypted
+ );
+ }
+ return e;
+ } catch (const json::parse_error &) {
+ return e;
+ }
+}
+
DbStore::DbStore()
: m_thread(new QThread)
, m_obj(new QObject)
@@ -85,6 +109,74 @@
co_return {true, QString()};
}
+QCoro::Task<std::pair<bool, QString>> DbStore::saveEvents(Kazv::SaveEventsRequested s)
+{
+ for (const auto &[roomId, el] : s.timelineEvents) {
+ auto roomIdQs = QString::fromStdString(roomId);
+ for (const auto &event : el) {
+ auto res = co_await importOne(roomIdQs, event, /* isInTimeline = */ true);
+ if (!res.first) {
+ qCWarning(kazvLog) << "Error inserting value:" << res.second;
+ co_return res;
+ }
+ }
+ }
+
+ for (const auto &[roomId, el] : s.nonTimelineEvents) {
+ auto roomIdQs = QString::fromStdString(roomId);
+ for (const auto &event : el) {
+ auto res = co_await importOne(roomIdQs, event, /* isInTimeline = */ false);
+ if (!res.first) {
+ qCWarning(kazvLog) << "Error inserting value:" << res.second;
+ co_return res;
+ }
+ }
+ }
+ co_return {true, QString()};
+}
+
+inline const auto getEventByIdQuery = uR"xxx(SELECT
+ encrypted, decrypted,
+ original_json, decrypted_json, is_in_timeline
+FROM events WHERE
+ room_id = :room_id AND event_id = :event_id
+;)xxx"_s;
+QCoro::Task<std::optional<std::pair<Kazv::Event, bool>>> DbStore::getEventById(const QString &roomId, const QString &eventId)
+{
+ QPromise<std::optional<std::pair<Kazv::Event, bool>>> p;
+ auto fut = p.future();
+ QMetaObject::invokeMethod(m_obj, [this, p=std::move(p), roomId, eventId]() mutable {
+ QSqlQuery q(m_d.value());
+ auto res = q.prepare(getEventByIdQuery);
+ if (!res) {
+ p.addResult(std::nullopt);
+ p.finish();
+ return;
+ }
+ q.bindValue(u":room_id"_s, roomId);
+ q.bindValue(u":event_id"_s, eventId);
+ res = q.exec();
+ if (!res || !q.next()) {
+ p.addResult(std::nullopt);
+ p.finish();
+ return;
+ }
+ auto encrypted = q.value(0).toBool();
+ auto decrypted = q.value(1).toBool();
+ auto originalJson = q.value(2).toString();
+ auto decryptedJson = q.value(3).toString();
+ auto isInTimeline = q.value(4).toBool();
+ auto eventOpt = resultToEvent(encrypted, decrypted, std::move(originalJson), std::move(decryptedJson));
+ auto ret = eventOpt
+ ? std::optional<std::pair<Event, bool>>(
+ {eventOpt.value(), isInTimeline}
+ ) : std::nullopt;
+ p.addResult(ret);
+ p.finish();
+ });
+ co_return co_await fut;
+}
+
bool DbStore::valid() const
{
return m_d.has_value();
@@ -167,27 +259,23 @@
// We invoke the query inside the thread of the database, then use the
// promise-future protocol to pass the result back as a coroutine.
QPromise<std::pair<bool, QString>> p;
- qCDebug(kazvLog) << "DbStore: Async Query:" << qs;
auto fut = p.future();
QMetaObject::invokeMethod(m_obj, [this, p=std::move(p), s=std::move(qs), tf]() mutable {
p.start();
QSqlQuery q(m_d.value());
- qCDebug(kazvLog) << "DbStore: Invoking Query:" << s;
auto res = q.prepare(std::move(s));
if (!res) {
- qCDebug(kazvLog) << "DbStore: Async Query Prepare Failed: Driver:" << q.lastError().driverText();
- qCDebug(kazvLog) << "DbStore: Async Query Prepare Failed: Db:" << q.lastError().databaseText();
+ qCWarning(kazvLog) << "DbStore: Async Query Prepare Failed: Driver:" << q.lastError().driverText();
+ qCWarning(kazvLog) << "DbStore: Async Query Prepare Failed: Db:" << q.lastError().databaseText();
p.addResult({false, q.lastError().text()});
p.finish();
return;
}
tf(q);
- qCDebug(kazvLog) << "DbStore: where:" << q.boundValueNames() << q.boundValues();
res = q.exec();
- qCDebug(kazvLog) << "DbStore: Async Query Done:" << res;
if (!res) {
- qCDebug(kazvLog) << "DbStore: Async Query Failed: Driver:" << q.lastError().driverText();
- qCDebug(kazvLog) << "DbStore: Async Query Failed: Db:" << q.lastError().databaseText();
+ qCWarning(kazvLog) << "DbStore: Async Query Failed: Driver:" << q.lastError().driverText();
+ qCWarning(kazvLog) << "DbStore: Async Query Failed: Db:" << q.lastError().databaseText();
p.addResult({false, q.lastError().text()});
} else {
p.addResult({true, QString()});
diff --git a/src/matrix-sdk.hpp b/src/matrix-sdk.hpp
--- a/src/matrix-sdk.hpp
+++ b/src/matrix-sdk.hpp
@@ -138,6 +138,7 @@
void sessionChanged();
void loadSessionFinished(QString sessionName, MatrixSdk::LoadSessionResult result);
+ void dbResolved(bool success);
public Q_SLOTS:
void login(const QString &userId, const QString &password, const QString &homeserverUrl);
diff --git a/src/matrix-sdk.cpp b/src/matrix-sdk.cpp
--- a/src/matrix-sdk.cpp
+++ b/src/matrix-sdk.cpp
@@ -84,6 +84,19 @@
void resume() { throw std::runtime_error{"not implemented!"}; }
};
+QCoro::Task<std::pair<bool, QString>> setupAndImport(SdkModel model, DbStore *dbStore, std::string userDataDir)
+{
+ auto res = co_await dbStore->setup(
+ userDataDir,
+ model.c().userId,
+ model.c().deviceId
+ );
+ if (!res.first) {
+ co_return res;
+ }
+ co_return co_await dbStore->importAllFrom(model);
+}
+
std::filesystem::path sessionDirForUserAndDeviceId(std::filesystem::path userDataDir, std::string userId, std::string deviceId)
{
auto encodedUserId = encodeBase64(userId, Base64Opts::urlSafe);
@@ -92,6 +105,35 @@
return sessionDir;
}
+struct PendingSaveEvents : public SaveEventsRequested
+{
+ using DataT = immer::map<std::string, EventList>;
+ void add(SaveEventsRequested s)
+ {
+ timelineEvents = addOneType(std::move(timelineEvents), s.timelineEvents);
+ nonTimelineEvents = addOneType(std::move(nonTimelineEvents), s.nonTimelineEvents);
+ }
+
+ static DataT addOneType(DataT orig, DataT addon)
+ {
+ for (const auto &[roomId, el]: addon) {
+ orig = std::move(orig).update(roomId, [&el](EventList origEl) {
+ // This works without explicit dedupe because we do not do parallel
+ // inserts and the insert order is the same as list order.
+ return origEl + el;
+ });
+ }
+ return orig;
+ }
+};
+
+enum DbStatus
+{
+ Pending,
+ Ready,
+ Failed,
+};
+
struct MatrixSdkPrivate
{
MatrixSdkPrivate(MatrixSdk *q, bool testing, std::unique_ptr<KazvSessionLockGuard> lockGuard, std::unique_ptr<DbStore> dbStore);
@@ -113,6 +155,9 @@
SecondaryRootT secondaryRoot;
Client clientOnSecondaryRoot;
NotificationHandler notificationHandler;
+ DbStatus dbStatus;
+ PendingSaveEvents pendingSaveEvents{};
+
void runIoContext() {
thread->start();
}
@@ -128,6 +173,26 @@
}
void serializeClientToFile(Client c);
+
+ void saveOrQueueEvents(SaveEventsRequested s)
+ {
+ if (dbStatus == Ready) {
+ saveEvents(std::move(s));
+ } else if (dbStatus == Pending) {
+ pendingSaveEvents.add(std::move(s));
+ }
+ // Database open failed, do not do anything
+ }
+
+ void saveEvents(SaveEventsRequested s)
+ {
+ dbStore->saveEvents(std::move(s))
+ .then([](const auto &stat) {
+ if (!stat.first) {
+ qCWarning(kazvLog) << "Cannot save events:" << stat.second;
+ }
+ });
+ }
};
// Cleaning up notes:
@@ -260,6 +325,7 @@
, secondaryRoot(sdk.createSecondaryRoot(QtEventLoop{q}))
, clientOnSecondaryRoot(sdk.clientFromSecondaryRoot(secondaryRoot))
, notificationHandler(clientOnSecondaryRoot.notificationHandler())
+ , dbStatus(this->dbStore ? Ready : Pending)
{
obj->moveToThread(thread);
}
@@ -285,6 +351,7 @@
, secondaryRoot(sdk.createSecondaryRoot(QtEventLoop{q}, std::move(model)))
, clientOnSecondaryRoot(sdk.clientFromSecondaryRoot(secondaryRoot))
, notificationHandler(clientOnSecondaryRoot.notificationHandler())
+ , dbStatus(this->dbStore ? Ready : Pending)
{
obj->moveToThread(thread);
}
@@ -295,12 +362,43 @@
{
init();
connect(this, &MatrixSdk::trigger,
- this, [](KazvEvent e) {
- qDebug() << "receiving trigger:";
- if (std::holds_alternative<LoginSuccessful>(e)) {
- qDebug() << "Login successful";
- }
- });
+ this, [this](KazvEvent e) {
+ if (std::holds_alternative<SaveEventsRequested>(e)) {
+ auto s = std::get<SaveEventsRequested>(e);
+ m_d->saveOrQueueEvents(s);
+ }
+ });
+
+ // loginSuccessful will be emitted only for new sessions (not for loaded
+ // sessions),
+ connect(this, &MatrixSdk::loginSuccessful, this, [this]() {
+ if (m_d->dbStore) {
+ qCWarning(kazvLog) << "Trying to replace current db store, ignoring";
+ return;
+ }
+ m_d->dbStore = std::make_unique<DbStore>();
+ // This must be called from the GUI thread
+ auto model = m_d->secondaryRoot.get();
+ setupAndImport(std::move(model), m_d->dbStore.get(), m_d->userDataDir)
+ .then([this](const auto &stat) {
+ if (!stat.first) {
+ qCWarning(kazvLog) << "Cannot set up database:" << stat.second;
+ }
+ Q_EMIT dbResolved(stat.first);
+ });
+ });
+
+ connect(this, &MatrixSdk::dbResolved, this, [this](bool success) {
+ if (!success) {
+ m_d->dbStatus = Failed;
+ } else {
+ qCDebug(kazvLog) << "db loading succeeded";
+ auto pendingSaveEvents = std::move(m_d->pendingSaveEvents);
+ m_d->pendingSaveEvents = {};
+ m_d->saveEvents(std::move(pendingSaveEvents));
+ m_d->dbStatus = Ready;
+ }
+ });
}
void MatrixSdk::init()
@@ -692,18 +790,7 @@
}
auto dbStore = std::make_unique<DbStore>();
- auto stat = QCoro::waitFor([](SdkModel model, DbStore *dbStore, std::string userDataDir) -> QCoro::Task<std::pair<bool, QString>> {
- qCDebug(kazvLog) << "open db:" << userDataDir << model.c().userId << model.c().deviceId;
- auto res = co_await dbStore->setup(
- userDataDir,
- model.c().userId,
- model.c().deviceId
- );
- if (!res.first) {
- co_return res;
- }
- co_return co_await dbStore->importAllFrom(model);
- }(model, dbStore.get(), m_d->userDataDir));
+ auto stat = QCoro::waitFor(setupAndImport(model, dbStore.get(), m_d->userDataDir));
if (!stat.first) {
qCWarning(kazvLog) << "Unable to open db store:" << stat.second;
Q_EMIT loadSessionFinished(sessionName, SessionDeserializeFailed);
diff --git a/src/tests/db-store-test.cpp b/src/tests/db-store-test.cpp
--- a/src/tests/db-store-test.cpp
+++ b/src/tests/db-store-test.cpp
@@ -13,6 +13,7 @@
using namespace Kazv::Factory;
using namespace Kazv;
+using namespace Qt::Literals::StringLiterals;
class DbStoreTest : public QObject
{
@@ -22,6 +23,8 @@
void init();
void testSetup();
void testImportAllFrom();
+ void testResultToEvent();
+ void testSaveEvents();
void cleanup();
private:
@@ -63,5 +66,53 @@
QVERIFY(stat.first);
}
+void DbStoreTest::testResultToEvent()
+{
+ {
+ Event e = makeEvent(withEventType("m.room.encrypted"))
+ .setDecryptedJson(json{{"content", {{"text", "mew"}}}}, Event::Decrypted);
+ QVERIFY(e == DbStore::resultToEvent(
+ e.encrypted(),
+ e.decrypted(),
+ QString::fromStdString(e.originalJson().get().dump()),
+ QString::fromStdString(e.decryptedJson().get().dump())
+ ));
+ }
+ {
+ Event e = makeEvent();
+ QVERIFY(e == DbStore::resultToEvent(
+ e.encrypted(),
+ e.decrypted(),
+ QString::fromStdString(e.originalJson().get().dump()),
+ QVariant().toString()
+ ));
+ }
+}
+
+void DbStoreTest::testSaveEvents()
+{
+ DbStore dbStore;
+ {
+ auto stat = QCoro::waitFor(dbStore.setup(m_tempDir.path().toStdString(), "@userid:example.com", "device1"));
+ QVERIFY(stat.first);
+ }
+ auto e1 = makeEvent();
+ auto e2 = makeEvent();
+ auto e3 = makeEvent();
+ SaveEventsRequested s{
+ {{"!room1:example.com", {e1, e2}}},
+ {{"!room2:example.com", {e3}}},
+ };
+ {
+ auto stat = QCoro::waitFor(dbStore.saveEvents(s));
+ QVERIFY(stat.first);
+ }
+ auto res = QCoro::waitFor(dbStore.getEventById(u"!room1:example.com"_s, QString::fromStdString(e1.id())));
+ QVERIFY(res.has_value() && res.value().first == e1 && res.value().second);
+
+ res = QCoro::waitFor(dbStore.getEventById(u"!room2:example.com"_s, QString::fromStdString(e3.id())));
+ QVERIFY(res.has_value() && res.value().first == e3 && !res.value().second);
+}
+
QTEST_MAIN(DbStoreTest)
#include "db-store-test.moc"
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Jan 24, 4:39 AM (19 h, 38 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1001468
Default Alt Text
D270.1769258389.diff (15 KB)
Attached To
Mode
D270: Save events to database upon receiving SaveEventsRequested
Attached
Detach File
Event Timeline
Log In to Comment