Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F140039
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Size
48 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/base/basejob.cpp b/src/base/basejob.cpp
index 6ff0d60..3a02972 100644
--- a/src/base/basejob.cpp
+++ b/src/base/basejob.cpp
@@ -1,233 +1,291 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#include <lager/util.hpp>
#include <vector>
#include <tuple>
#include "basejob.hpp"
namespace Kazv
{
BaseJob::Get BaseJob::GET{};
BaseJob::Post BaseJob::POST{};
BaseJob::Put BaseJob::PUT{};
BaseJob::Delete BaseJob::DELETE{};
struct BaseJob::Private
{
Private(std::string serverUrl,
std::string requestUrl,
Method method,
std::string token,
ReturnType returnType,
Body body,
Query query,
Header header,
std::string jobId);
std::string fullRequestUrl;
Method method;
ReturnType returnType;
BytesBody body;
Query query;
Header header;
JsonWrap data;
std::string jobId;
+ std::optional<std::string> queueId;
+ JobQueuePolicy queuePolicy;
};
BaseJob::Private::Private(std::string serverUrl,
std::string requestUrl,
Method method,
std::string token,
ReturnType returnType,
Body body,
Query query,
Header header,
std::string jobId)
: fullRequestUrl(serverUrl + requestUrl)
, method(std::move(method))
, returnType(returnType)
, body()
, query(std::move(query))
, jobId(std::move(jobId))
{
auto header_ = header.get();
if (token.size()) {
header_["Authorization"] = "Bearer " + token;
}
if (isBodyJson(body)) {
JsonBody j = std::get<JsonBody>(std::move(body));
header_["Content-Type"] = "application/json";
this->body = j.get().dump();
} else if (std::holds_alternative<BytesBody>(body)) {
BytesBody b = std::get<BytesBody>(std::move(body));
this->body = b;
}
this->header = header_;
}
BaseJob::BaseJob(std::string serverUrl,
std::string requestUrl,
Method method,
std::string jobId,
std::string token,
ReturnType returnType,
Body body,
Query query,
Header header)
: m_d(std::move(Private(serverUrl, requestUrl, method, token,
returnType, body, query, header, jobId)))
{
}
bool BaseJob::shouldReturnJson() const
{
return m_d->returnType == ReturnType::Json;
};
std::string BaseJob::url() const
{
return m_d->fullRequestUrl;
};
auto BaseJob::requestBody() const -> BytesBody
{
return m_d->body;
}
auto BaseJob::requestHeader() const -> Header
{
return m_d->header;
}
auto BaseJob::returnType() const -> ReturnType
{
return m_d->returnType;
}
auto BaseJob::requestQuery() const -> Query
{
return m_d->query;
}
auto BaseJob::requestMethod() const -> Method
{
return m_d->method;
}
JsonWrap Response::jsonBody() const
{
return std::get<JsonWrap>(body);
}
+ json Response::dataJson(const std::string &key) const
+ {
+ return extraData.get()[key];
+ }
+
+ std::string Response::dataStr(const std::string &key) const
+ {
+ return dataJson(key);
+ }
+
+ std::string Response::jobId() const
+ {
+ return dataStr("-job-id");
+ }
+
+
bool BaseJob::contentTypeMatches(immer::array<std::string> expected, std::string actual)
{
for (const auto &i : expected) {
if (i == "*/*"s) {
return true;
} else {
std::size_t pos = i.find("/*"s);
if (pos != std::string::npos) {
std::string majorType(i.data(), i.data() + pos + 1); // includes `/'
if (actual.find(majorType) == 0) {
return true;
}
} else if (i == actual) {
return true;
}
}
}
return false;
}
Response BaseJob::genResponse(Response r) const
{
auto j = m_d->data.get();
j["-job-id"] = m_d->jobId;
r.extraData = j;
return r;
}
void BaseJob::attachData(JsonWrap j)
{
m_d->data = j;
}
BaseJob BaseJob::withData(JsonWrap j) &&
{
auto ret = BaseJob(std::move(*this));
ret.attachData(j);
return ret;
}
BaseJob BaseJob::withData(JsonWrap j) const &
{
auto ret = BaseJob(*this);
ret.attachData(j);
return ret;
}
+ BaseJob BaseJob::withQueue(std::string id, JobQueuePolicy policy) &&
+ {
+ auto ret = BaseJob(std::move(*this));
+ ret.m_d->queueId = id;
+ ret.m_d->queuePolicy = policy;
+ return ret;
+ }
+
+ BaseJob BaseJob::withQueue(std::string id, JobQueuePolicy policy) const &
+ {
+ auto ret = BaseJob(*this);
+ ret.m_d->queueId = id;
+ ret.m_d->queuePolicy = policy;
+ return ret;
+ }
+
+ json BaseJob::dataJson(const std::string &key) const
+ {
+ return m_d->data.get()[key];
+ }
+
+ std::string BaseJob::dataStr(const std::string &key) const
+ {
+ return dataJson(key);
+ }
+
+ std::string BaseJob::jobId() const
+ {
+ return m_d->jobId;
+ }
+
+ std::optional<std::string> BaseJob::queueId() const
+ {
+ return m_d->queueId;
+ }
+
+ JobQueuePolicy BaseJob::queuePolicy() const
+ {
+ return m_d->queuePolicy;
+ }
std::string Response::errorCode() const
{
// https://matrix.org/docs/spec/client_server/latest#api-standards
if (isBodyJson(body)) {
auto jb = jsonBody();
if (jb.get().contains("errcode")) {
auto code = jb.get()["errcode"].get<std::string>();
if (code != "M_UNKNOWN") {
return code;
}
}
}
return std::to_string(statusCode);
}
std::string Response::errorMessage() const
{
if (isBodyJson(body)) {
auto jb = jsonBody();
if (jb.get().contains("error")) {
auto msg = jb.get()["error"].get<std::string>();
return msg;
}
}
return "";
}
bool operator==(BaseJob a, BaseJob b)
{
return a.m_d->fullRequestUrl == b.m_d->fullRequestUrl
&& a.m_d->method == b.m_d->method
&& a.m_d->returnType == b.m_d->returnType
&& a.m_d->body == b.m_d->body
&& a.m_d->query == b.m_d->query
&& a.m_d->header == b.m_d->header
&& a.m_d->data == b.m_d->data
&& a.m_d->jobId == b.m_d->jobId;
}
bool operator!=(BaseJob a, BaseJob b)
{
return !(a == b);
}
}
diff --git a/src/base/basejob.hpp b/src/base/basejob.hpp
index e120144..f50835b 100644
--- a/src/base/basejob.hpp
+++ b/src/base/basejob.hpp
@@ -1,264 +1,273 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <variant>
#include <tuple>
#include <functional>
#include <string>
#include <map>
#include <future>
#include <immer/map.hpp>
#include <immer/array.hpp>
#include <immer/box.hpp>
#include "types.hpp"
#include "descendent.hpp"
namespace Kazv
{
using Header = immer::box<std::map<std::string, std::string>>;
using BytesBody = Bytes;
using JsonBody = JsonWrap;
struct EmptyBody {};
using Body = std::variant<EmptyBody, JsonBody, BytesBody>;
inline bool operator==(EmptyBody, EmptyBody)
{
return true;
}
inline bool isBodyJson(Body body) {
return std::holds_alternative<JsonBody>(body);
};
+ enum JobQueuePolicy
+ {
+ AlwaysContinue,
+ CancelFutureIfFailed
+ };
+
struct Response {
using StatusCode = int;
StatusCode statusCode;
Body body;
Header header;
JsonWrap extraData;
std::string errorCode() const;
std::string errorMessage() const;
JsonWrap jsonBody() const;
constexpr bool success() const {
return statusCode < 400;
}
- inline json dataJson(const std::string &key) const {
- return extraData.get()[key];
- }
- inline std::string dataStr(const std::string &key) const {
- return dataJson(key);
- }
- inline std::string jobId() const {
- return dataStr("-job-id");
- }
+ json dataJson(const std::string &key) const;
+ std::string dataStr(const std::string &key) const;
+ std::string jobId() const;
};
inline bool operator==(Response a, Response b)
{
return a.statusCode == b.statusCode
&& a.body == b.body
&& a.header == b.header
&& a.extraData == b.extraData;
}
class BaseJob
{
public:
struct Get {};
struct Post {};
struct Put {};
struct Delete {};
using Method = std::variant<Get, Post, Put, Delete>;
static Get GET;
static Post POST;
static Put PUT;
static Delete DELETE;
class Query : public std::vector<std::pair<std::string, std::string>>
{
using BaseT = std::vector<std::pair<std::string, std::string>>;
public:
using BaseT::BaseT;
void add(std::string k, std::string v) {
push_back({k, v});
}
};
using Body = ::Kazv::Body;
using BytesBody = ::Kazv::BytesBody;
using JsonBody = ::Kazv::JsonBody;
using EmptyBody = ::Kazv::EmptyBody;
using Header = ::Kazv::Header;
using Response = ::Kazv::Response;
enum ReturnType {
Json,
Byte,
};
BaseJob(std::string serverUrl,
std::string requestUrl,
Method method,
std::string jobId,
std::string token = {},
ReturnType returnType = ReturnType::Json,
Body body = EmptyBody{},
Query query = {},
Header header = {});
bool shouldReturnJson() const;
std::string url() const;
BytesBody requestBody() const;
Header requestHeader() const;
ReturnType returnType() const;
/// returns the non-encoded query as an array of pairs
Query requestQuery() const;
Method requestMethod() const;
static bool contentTypeMatches(immer::array<std::string> expected, std::string actual);
Response genResponse(Response r) const;
BaseJob withData(JsonWrap j) &&;
BaseJob withData(JsonWrap j) const &;
+ BaseJob withQueue(std::string id, JobQueuePolicy policy = AlwaysContinue) &&;
+ BaseJob withQueue(std::string id, JobQueuePolicy policy = AlwaysContinue) const &;
+
+ json dataJson(const std::string &key) const;
+ std::string dataStr(const std::string &key) const;
+ std::string jobId() const;
+ std::optional<std::string> queueId() const;
+ JobQueuePolicy queuePolicy() const;
+
protected:
void attachData(JsonWrap data);
private:
friend bool operator==(BaseJob a, BaseJob b);
struct Private;
Descendent<Private> m_d;
};
bool operator==(BaseJob a, BaseJob b);
bool operator!=(BaseJob a, BaseJob b);
inline bool operator==(BaseJob::Get, BaseJob::Get) { return true; }
inline bool operator==(BaseJob::Post, BaseJob::Post) { return true; }
inline bool operator==(BaseJob::Put, BaseJob::Put) { return true; }
inline bool operator==(BaseJob::Delete, BaseJob::Delete) { return true; }
namespace detail
{
template<class T>
struct AddToQueryT
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
q.add(name, std::to_string(std::forward<U>(arg)));
}
};
template<>
struct AddToQueryT<std::string>
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
q.add(name, std::forward<U>(arg));
}
};
template<>
struct AddToQueryT<bool>
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
q.add(name, std::forward<U>(arg) ? "true"s : "false"s);
}
};
template<class T>
struct AddToQueryT<immer::array<T>>
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
for (auto v : std::forward<U>(arg)) {
q.add(name, v);
}
}
};
template<>
struct AddToQueryT<json>
{
// https://github.com/nlohmann/json/issues/2040
static void call(BaseJob::Query &q, std::string /* name */, const json &arg) {
// assume v is string type
for (auto [k, v] : arg.items()) {
q.add(k, v);
}
}
};
}
template<class T>
inline void addToQuery(BaseJob::Query &q, std::string name, T &&arg)
{
detail::AddToQueryT<std::decay_t<T>>::call(q, name, std::forward<T>(arg));
}
namespace detail
{
template<class T>
struct AddToQueryIfNeededT
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
if constexpr (detail::hasEmptyMethod(arg)) {
if (! arg.empty()) {
addToQuery(q, name, std::forward<U>(arg));
}
} else {
addToQuery(q, name, std::forward<U>(arg));
}
}
};
template<class T>
struct AddToQueryIfNeededT<std::optional<T>>
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
if (arg.has_value()) {
addToQuery(q, name, std::forward<U>(arg).value());
}
}
};
}
template<class T>
inline void addToQueryIfNeeded(BaseJob::Query &q, std::string name, T &&arg)
{
detail::AddToQueryIfNeededT<std::decay_t<T>>::call(q, name, std::forward<T>(arg));
}
}
diff --git a/src/base/jobinterface.hpp b/src/base/jobinterface.hpp
index 63cf588..b4da446 100644
--- a/src/base/jobinterface.hpp
+++ b/src/base/jobinterface.hpp
@@ -1,39 +1,53 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <functional>
#include "basejob.hpp"
namespace Kazv
{
+ inline const std::string headFailureCancelledErrorCode{"MOE_KAZV_MXC_HEAD_FAILURE_CANCELLED"};
+ inline const std::string headFailureCancelledErrorMsg{"This job is cancelled because one before it failed."};
+ inline const int headFailureCancelledStatusCode{490};
+
struct JobInterface
{
virtual ~JobInterface() = default;
virtual void async(std::function<void()> func) = 0;
- virtual void setTimeout(std::function<void()> func, int ms) = 0;
- virtual void setInterval(std::function<void()> func, int ms) = 0;
+ virtual void setTimeout(std::function<void()> func, int ms,
+ std::optional<std::string> timerId = std::nullopt) = 0;
+ virtual void setInterval(std::function<void()> func, int ms,
+ std::optional<std::string> timerId = std::nullopt) = 0;
+
+ /// cancels all pending timers with timerId
+ virtual void cancel(std::string timerId) = 0;
+
/// makes an async fetch.
- /// callback will not block the current thread.
+ /// callback should not block the current thread.
+ /// if job has a `queueId`, it must be queued and executed only after
+ /// every job before it in the queue `queueId` returned.
+ /// if job has a `queuePolicy` of CancelFutureIfFailed, the failure of
+ /// this job must remove every other job after it in the queue `queueId`.
virtual void submit(BaseJob job, std::function<void(Response)> callback) = 0;
};
}
diff --git a/src/client/client-model.hpp b/src/client/client-model.hpp
index 7a923f9..59b3c01 100644
--- a/src/client/client-model.hpp
+++ b/src/client/client-model.hpp
@@ -1,300 +1,308 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <tuple>
#include <variant>
#include <string>
#include <optional>
#include <lager/context.hpp>
#include <boost/hana.hpp>
#ifndef NDEBUG
#include <lager/debug/cereal/struct.hpp>
#endif
#include <csapi/sync.hpp>
#include <jobinterface.hpp>
#include <eventinterface.hpp>
#include "clientfwd.hpp"
#include "error.hpp"
#include "room/room-model.hpp"
namespace Kazv
{
inline const std::string DEFTXNID{"0"};
enum RoomVisibility
{
Private,
Public,
};
enum CreateRoomPreset
{
PrivateChat,
PublicChat,
TrustedPrivateChat,
};
struct ClientModel
{
std::string serverUrl;
std::string userId;
std::string token;
std::string deviceId;
bool loggedIn;
Error error;
std::string syncToken;
RoomListModel roomList;
immer::map<std::string /* sender */, Event> presence;
immer::map<std::string /* type */, Event> accountData;
std::string nextTxnId{DEFTXNID};
immer::flex_vector<BaseJob> nextJobs;
immer::flex_vector<KazvEvent> nextTriggers;
// helpers
template<class Job>
struct MakeJobT
{
template<class ...Args>
constexpr auto make(Args &&...args) const {
if constexpr (Job::needsAuth()) {
return Job(
serverUrl,
token,
std::forward<Args>(args)...);
} else {
return Job(
serverUrl,
std::forward<Args>(args)...);
}
}
std::string serverUrl;
std::string token;
};
template<class Job>
constexpr auto job() const {
return MakeJobT<Job>{serverUrl, token};
}
inline void addJob(BaseJob j) {
nextJobs = std::move(nextJobs).push_back(std::move(j));
}
inline auto popAllJobs() {
auto jobs = std::move(nextJobs);
nextJobs = DEFVAL;
return jobs;
};
inline void addTrigger(KazvEvent t) {
addTriggers({t});
}
inline void addTriggers(immer::flex_vector<KazvEvent> c) {
nextTriggers = std::move(nextTriggers) + c;
}
inline auto popAllTriggers() {
auto triggers = std::move(nextTriggers);
nextTriggers = DEFVAL;
return triggers;
}
using Action = ClientAction;
using Effect = ClientEffect;
using Result = ClientResult;
static Result update(ClientModel m, Action a);
};
// actions:
struct LoginAction {
std::string serverUrl;
std::string username;
std::string password;
std::optional<std::string> deviceName;
};
struct TokenLoginAction
{
std::string serverUrl;
std::string username;
std::string token;
std::string deviceId;
};
struct LogoutAction {};
struct SyncAction {};
struct PaginateTimelineAction
{
std::string roomId;
std::optional<int> limit;
};
struct SendMessageAction
{
std::string roomId;
Event event;
};
struct SendStateEventAction
{
std::string roomId;
Event event;
};
struct CreateRoomAction
{
using Visibility = RoomVisibility;
using Preset = CreateRoomPreset;
Visibility visibility;
std::optional<std::string> roomAliasName;
std::optional<std::string> name;
std::optional<std::string> topic;
immer::array<std::string> invite;
//immer::array<Invite3pid> invite3pid;
std::optional<std::string> roomVersion;
JsonWrap creationContent;
immer::array<Event> initialState;
std::optional<Preset> preset;
std::optional<bool> isDirect;
JsonWrap powerLevelContentOverride;
};
struct GetRoomStatesAction
{
std::string roomId;
};
struct InviteToRoomAction
{
std::string roomId;
std::string userId;
};
struct JoinRoomByIdAction
{
std::string roomId;
};
struct JoinRoomAction
{
std::string roomIdOrAlias;
immer::array<std::string> serverName;
};
struct LeaveRoomAction
{
std::string roomId;
};
struct ForgetRoomAction
{
std::string roomId;
};
struct SetTypingAction
{
std::string roomId;
bool typing;
std::optional<int> timeoutMs;
};
struct PostReceiptAction
{
std::string roomId;
std::string eventId;
};
struct SetReadMarkerAction
{
std::string roomId;
std::string eventId;
};
+ struct UploadContentAction
+ {
+ Bytes content;
+ std::optional<std::string> filename;
+ std::optional<std::string> contentType;
+ std::string uploadId; // to be used by library users
+ };
+
struct ProcessResponseAction
{
Response response;
};
inline bool operator==(ClientModel a, ClientModel b)
{
return a.serverUrl == b.serverUrl
&& a.userId == b.userId
&& a.token == b.token
&& a.deviceId == b.deviceId
&& a.loggedIn == b.loggedIn
&& a.error == b.error
&& a.syncToken == b.syncToken
&& a.roomList == b.roomList
&& a.presence == b.presence
&& a.accountData == b.accountData
&& a.nextTxnId == b.nextTxnId
&& a.nextJobs == b.nextJobs
&& a.nextTriggers == b.nextTriggers;
}
#ifndef NDEBUG
LAGER_CEREAL_STRUCT(LoginAction);
LAGER_CEREAL_STRUCT(TokenLoginAction);
LAGER_CEREAL_STRUCT(LogoutAction);
LAGER_CEREAL_STRUCT(SyncAction);
LAGER_CEREAL_STRUCT(PaginateTimelineAction);
LAGER_CEREAL_STRUCT(SendMessageAction);
LAGER_CEREAL_STRUCT(SendStateEventAction);
LAGER_CEREAL_STRUCT(CreateRoomAction);
LAGER_CEREAL_STRUCT(GetRoomStatesAction);
LAGER_CEREAL_STRUCT(InviteToRoomAction);
LAGER_CEREAL_STRUCT(JoinRoomByIdAction);
LAGER_CEREAL_STRUCT(JoinRoomAction);
LAGER_CEREAL_STRUCT(LeaveRoomAction);
LAGER_CEREAL_STRUCT(ForgetRoomAction);
LAGER_CEREAL_STRUCT(SetTypingAction);
LAGER_CEREAL_STRUCT(PostReceiptAction);
LAGER_CEREAL_STRUCT(ProcessResponseAction);
LAGER_CEREAL_STRUCT(SetReadMarkerAction);
#endif
template<class Archive>
void serialize(Archive &ar, ClientModel &m, std::uint32_t const /*version*/)
{
ar(m.serverUrl, m.userId, m.token, m.deviceId, m.loggedIn,
m.error,
m.syncToken,
m.roomList,
m.presence,
m.accountData,
m.nextTxnId);
}
}
CEREAL_CLASS_VERSION(Kazv::ClientModel, 0);
diff --git a/src/job/cprjobhandler.cpp b/src/job/cprjobhandler.cpp
index 9f5afba..ae68bfb 100644
--- a/src/job/cprjobhandler.cpp
+++ b/src/job/cprjobhandler.cpp
@@ -1,125 +1,327 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
+#include <algorithm>
+#include <deque>
+
+#include <zug/into_vector.hpp>
+#include <zug/transducer/map.hpp>
#include <cpr/cpr.h>
#include "cprjobhandler.hpp"
#include "debug.hpp"
namespace Kazv
{
+ struct CprJobHandler::Private
+ {
+ CprJobHandler *q;
+ boost::asio::io_context::executor_type executor;
+ using TimerSP = std::shared_ptr<boost::asio::steady_timer>;
+ using TimerSPList = std::vector<TimerSP>;
+ using TimerMap = std::unordered_map<std::optional<std::string>, TimerSPList>;
+ TimerMap timerMap;
+
+ enum Status
+ {
+ Waiting,
+ Running,
+ };
+ using Callback = std::function<void(Response)>;
+ struct JobDesc
+ {
+ BaseJob job;
+ Callback callback;
+ Status status;
+ };
+ using JobQueue = std::deque<JobDesc>;
+ using JobMap = std::unordered_map<std::string, JobQueue>;
+ JobMap jobQueues;
+
+ void submitImpl(BaseJob job, std::function<void(Response)> userCallback);
+
+ void addToQueue(BaseJob job, Callback callback) {
+ boost::asio::post(
+ executor,
+ [=] {
+ // precondition: job has a queueId
+ auto queueId = job.queueId().value();
+
+ jobQueues[queueId].push_back(JobDesc{job, callback, Waiting});
+ });
+ }
+
+ void clearQueue(std::string queueId) {
+ boost::asio::post(
+ executor,
+ [=] { clearQueueImpl(queueId); });
+ }
+
+ void clearQueueImpl(std::string queueId) {
+ Response fakeResponse;
+ fakeResponse.statusCode = headFailureCancelledStatusCode;
+ fakeResponse.body = JsonBody(
+ json{ {"errcode", headFailureCancelledErrorCode},
+ {"error", headFailureCancelledErrorMsg} }
+ );
+ dbgJob << "clearQueueImpl called with " << queueId << std::endl;
+
+ for (auto [job, callback, status] : jobQueues[queueId]) {
+ dbgJob << "this job is " << (status == Waiting ? "Waiting" : "Running") << std::endl;
+ if (status == Waiting) {
+ // Run callback in a different thread, just as in submitImpl().
+ q->async([=] { callback(job.genResponse(fakeResponse)); } );
+ }
+ // if status is Running, the callback is already called
+ }
+ jobQueues[queueId].clear();
+ }
+
+ void popJob(std::string queueId) {
+ boost::asio::post(
+ executor,
+ [=] { popJobImpl(queueId); });
+ }
+
+ void popJobImpl(std::string queueId) {
+ dbgJob << "popJobImpl called with " << queueId << std::endl;
+ if (! jobQueues[queueId].empty()) {
+ dbgJob << "the first job is "
+ << (jobQueues[queueId].front().status == Waiting ? "Waiting" : "Running") << std::endl;
+ jobQueues[queueId].pop_front();
+ }
+ }
+
+ void monitorQueues() {
+ boost::asio::post(
+ executor,
+ [=] {
+ // precondition: job has a queueId
+ for (auto &[queueId, queue] : jobQueues) { // need to change queue
+ dbgJob << "monitoring queue " << queueId << std::endl;
+ if (! queue.empty()) {
+ auto [job, callback, status] = queue.front();
+ dbgJob << "the first job is "
+ << (status == Waiting ? "Waiting" : "Running") << std::endl;
+ if (status == Waiting) {
+ queue.front().status = Running;
+ submitImpl(
+ job,
+ [=](Response r) { // in new thread
+ callback(r);
+
+ if (! r.success() // should be enough for now
+ && job.queuePolicy() == CancelFutureIfFailed) {
+ clearQueue(queueId); // in executor thread
+ } else {
+ popJob(queueId); // in executor thread
+ }
+ });
+ }
+ }
+ }
+ });
+ }
+
+ void addTimerToMap(TimerSP timer, std::optional<std::string> timerId) {
+ boost::asio::post(
+ executor,
+ [=] {
+ timerMap[timerId].push_back(timer);
+ });
+ }
+
+ void clearTimer(TimerSP timer, std::optional<std::string> timerId) {
+ boost::asio::post(
+ executor,
+ [=] {
+ std::remove(timerMap[timerId].begin(), timerMap[timerId].end(), timer);
+ });
+ }
+
+ void cancelAllTimers(std::optional<std::string> timerId) {
+ boost::asio::post(
+ executor,
+ [=] {
+ cancelAllTimersImpl(timerId);
+ });
+ }
+
+ void cancelAllTimersImpl(std::optional<std::string> timerId) {
+ auto timers = timerMap[timerId];
+ timerMap.erase(timerId);
+ for (auto timer : timers) {
+ timer->cancel();
+ }
+ }
+
+ void intervalTimerCallback(TimerSP timer,
+ std::function<void()> func,
+ int ms,
+ const boost::system::error_code &error) {
+ if (! error) {
+ func();
+ auto dur = boost::asio::chrono::milliseconds(ms);
+ timer->expires_at(timer->expiry() + dur);
+ timer->async_wait(
+ [=](const boost::system::error_code &error) {
+ intervalTimerCallback(timer, func, ms, error);
+ });
+ }
+ }
+ };
+
CprJobHandler::CprJobHandler(boost::asio::io_context::executor_type executor)
- : executor(std::move(executor))
+ : m_d(Private{this, std::move(executor), Private::TimerMap{}, Private::JobMap{}})
{
+ setInterval(
+ [=] {
+ m_d->monitorQueues();
+ },
+ 50, // ms
+ "-queue-monitor");
}
CprJobHandler::~CprJobHandler() = default;
void CprJobHandler::async(std::function<void()> func)
{
- std::thread([func=std::move(func), guard=boost::asio::executor_work_guard(executor)]() {
+ std::thread([func=std::move(func), guard=boost::asio::executor_work_guard(m_d->executor)]() {
func();
}).detach();
}
- void CprJobHandler::setTimeout(std::function<void()> func, int ms)
+ void CprJobHandler::setTimeout(std::function<void()> func, int ms, std::optional<std::string> timerId)
{
auto timer=std::make_shared<boost::asio::steady_timer>(
- executor, boost::asio::chrono::milliseconds(ms));
+ m_d->executor, boost::asio::chrono::milliseconds(ms));
+
+ m_d->addTimerToMap(timer, timerId);
+
timer->async_wait(
[=, timer=timer](const boost::system::error_code &error){
if (! error) {
func();
+ this->m_d->clearTimer(timer, timerId);
}
});
}
- void CprJobHandler::setInterval(std::function<void()> func, int ms)
+ void CprJobHandler::setInterval(std::function<void()> func, int ms, std::optional<std::string> timerId)
{
auto dur = boost::asio::chrono::milliseconds(ms);
- auto timer = std::make_shared<boost::asio::steady_timer>(executor, dur);
+ auto timer = std::make_shared<boost::asio::steady_timer>(m_d->executor, dur);
+
+ m_d->addTimerToMap(timer, timerId);
+
timer->async_wait(
- [=, timer=timer](const boost::system::error_code &error) {
- if (!error) {
- func();
- timer->expires_at(timer->expiry() + dur);
- }
+ [=](const boost::system::error_code &error) {
+ m_d->intervalTimerCallback(timer, func, ms, error);
});
}
+ void CprJobHandler::cancel(std::string timerId)
+ {
+ m_d->cancelAllTimers(timerId);
+ }
+
void CprJobHandler::submit(BaseJob job, std::function<void(Response)> userCallback)
+ {
+ if (job.queueId()) {
+ m_d->addToQueue(job, userCallback);
+ } else {
+ m_d->submitImpl(job, userCallback);
+ }
+ }
+
+ void CprJobHandler::Private::submitImpl(BaseJob job, std::function<void(Response)> userCallback)
{
cpr::Url url{job.url()};
cpr::Body body(job.requestBody());
BaseJob::Header origHeader = job.requestHeader();
cpr::Header header(origHeader.get().begin(), origHeader.get().end());
cpr::Parameters params;
BaseJob::Query query = job.requestQuery();
BaseJob::ReturnType returnType = job.returnType();
BaseJob::Method method = job.requestMethod();
if (! query.empty()) {
// from cpr/parameters.cpp
cpr::CurlHolder holder;
for (const auto kv : query) {
std::string key = kv.first;
std::string value = kv.second;
params.AddParameter(cpr::Parameter(std::move(key), std::move(value)), holder);
}
}
auto callback = [returnType](cpr::Response r) -> Response {
Body body = r.text;
if (returnType == BaseJob::ReturnType::Json) {
try {
body = BaseJob::JsonBody(std::move(json::parse(r.text)));
} catch (const json::exception &e) {
// the response is not valid json
dbgJob << "body is not correct json: " << e.what() << std::endl;
}
}
return { r.status_code,
body,
BaseJob::Header(r.header.begin(), r.header.end()),
{} // extraData, will be added in genResponse
};
};
std::shared_future<Response> res = std::visit(lager::visitor{
[=](BaseJob::Get) {
return cpr::GetCallback(callback, url, header, body, params);
},
[=](BaseJob::Post) {
return cpr::PostCallback(callback, url, header, body, params);
},
[=](BaseJob::Put) {
return cpr::PutCallback(callback, url, header, body, params);
},
[=](BaseJob::Delete) {
return cpr::DeleteCallback(callback, url, header, body, params);
}
}, method).share();
- async([=]() {
- userCallback(job.genResponse(res.get()));
- });
+ q->async([=]() {
+ userCallback(job.genResponse(res.get()));
+ });
+ }
+
+ void CprJobHandler::stop()
+ {
+ boost::asio::post(
+ m_d->executor,
+ [=] {
+ auto ids = zug::into_vector(
+ zug::map([](auto i) { return i.first; }),
+ m_d->timerMap);
+ for (auto id : ids) {
+ m_d->cancelAllTimersImpl(id);
+ }
+ m_d->jobQueues.clear();
+ });
}
}
diff --git a/src/job/cprjobhandler.hpp b/src/job/cprjobhandler.hpp
index 9cd12bb..80c4e2f 100644
--- a/src/job/cprjobhandler.hpp
+++ b/src/job/cprjobhandler.hpp
@@ -1,40 +1,48 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <boost/asio.hpp>
#include "jobinterface.hpp"
+#include "descendent.hpp"
namespace Kazv
{
struct CprJobHandler : public JobInterface
{
CprJobHandler(boost::asio::io_context::executor_type executor);
~CprJobHandler() override;
void async(std::function<void()> func) override;
- void setTimeout(std::function<void()> func, int ms) override;
- void setInterval(std::function<void()> func, int ms) override;
+ void setTimeout(std::function<void()> func, int ms,
+ std::optional<std::string> timerId = std::nullopt) override;
+ void setInterval(std::function<void()> func, int ms,
+ std::optional<std::string> timerId = std::nullopt) override;
+ void cancel(std::string timerId) override;
+
void submit(BaseJob job,
std::function<void(Response)> callback) override;
+
+ void stop();
private:
- boost::asio::io_context::executor_type executor;
+ struct Private;
+ Descendent<Private> m_d;
};
}
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index e7253ec..e9c4257 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -1,20 +1,21 @@
include(CTest)
add_executable(kazvtest
testmain.cpp
basejobtest.cpp
cursorutiltest.cpp
client/client-test-util.cpp
client/sync-test.cpp
+ kazvjobtest.cpp
)
target_link_libraries(kazvtest
PRIVATE Catch2::Catch2
PRIVATE kazv
PRIVATE kazvjob
PRIVATE nlohmann_json::nlohmann_json
PRIVATE immer
PRIVATE lager
PRIVATE zug)
diff --git a/src/tests/basejobtest.cpp b/src/tests/basejobtest.cpp
index 10553a0..cfab429 100644
--- a/src/tests/basejobtest.cpp
+++ b/src/tests/basejobtest.cpp
@@ -1,53 +1,54 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#include <catch2/catch.hpp>
#include <iostream>
#include <future>
#include <basejob.hpp>
#include <cprjobhandler.hpp>
#include "tests.hpp"
using namespace Kazv;
TEST_CASE("Base job should fetch correctly", "[basejob]")
{
boost::asio::io_context ioContext;
BaseJob job(TEST_SERVER_URL, "/.well-known/matrix/client", BaseJob::Get{}, "TestJob");
CprJobHandler h(ioContext.get_executor());
h.submit(
job.withData(json{{"test", "bar"}}),
- [](auto r) {
+ [&h](auto r) {
if (r.statusCode == 200) {
REQUIRE( isBodyJson(r.body) );
json j = r.jsonBody().get();
REQUIRE_NOTHROW( (j["m.homeserver"]["base_url"]) );
REQUIRE( (j["m.homeserver"]["base_url"].size() > 0) );
REQUIRE( r.dataStr("test") == "bar" );
}
+ h.stop();
});
ioContext.run();
}
diff --git a/src/tests/kazvjobtest.cpp b/src/tests/kazvjobtest.cpp
new file mode 100644
index 0000000..b9f0e3e
--- /dev/null
+++ b/src/tests/kazvjobtest.cpp
@@ -0,0 +1,147 @@
+/*
+ * Copyright (C) 2020 Tusooa Zhu
+ *
+ * This file is part of libkazv.
+ *
+ * libkazv is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * libkazv is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with libkazv. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include <vector>
+#include <iostream>
+
+#include <catch2/catch.hpp>
+#include <cprjobhandler.hpp>
+
+#include "tests.hpp"
+
+using namespace Kazv;
+
+TEST_CASE("setTimeout should behave properly", "[kazvjob]")
+{
+ boost::asio::io_context ioContext;
+ CprJobHandler h(ioContext.get_executor());
+
+ std::vector<int> v;
+
+ h.setTimeout(
+ [&v, &h] {
+ v.push_back(500);
+ h.stop();
+ }, 500);
+
+ h.setTimeout(
+ [&v] {
+ v.push_back(100);
+ }, 100);
+
+ ioContext.run();
+
+ REQUIRE( v.size() == 2 );
+ REQUIRE( v[0] == 100 );
+ REQUIRE( v[1] == 500 );
+}
+
+TEST_CASE("setInterval should behave properly", "[kazvjob]")
+{
+ boost::asio::io_context ioContext;
+ CprJobHandler h(ioContext.get_executor());
+
+ std::vector<int> v;
+
+ h.setInterval(
+ [&v, &h] {
+ v.push_back(50);
+ std::cout << "timer executed" << std::endl;
+ if (v.size() >= 2) {
+ h.stop();
+ }
+ }, 100);
+
+ ioContext.run();
+
+ REQUIRE( v.size() == 2 );
+ REQUIRE( v[0] == 50 );
+ REQUIRE( v[1] == 50 );
+}
+
+TEST_CASE("setInterval can be cancelled", "[kazvjob]")
+{
+ boost::asio::io_context ioContext;
+ CprJobHandler h(ioContext.get_executor());
+
+ std::vector<int> v;
+
+ std::string id{"testTimerId"};
+
+ h.setInterval(
+ [&v, &h, id] {
+ v.push_back(50);
+ std::cout << "timer executed" << std::endl;
+ if (v.size() >= 2) {
+ h.cancel(id);
+ }
+ }, 100, id);
+
+ h.setTimeout(
+ [&h] {
+ h.stop();
+ }, 300);
+
+ ioContext.run();
+
+ REQUIRE( v.size() == 2 );
+ REQUIRE( v[0] == 50 );
+ REQUIRE( v[1] == 50 );
+}
+
+static BaseJob succJob =
+ BaseJob(TEST_SERVER_URL, "/.well-known/matrix/client", BaseJob::Get{}, "TestJob")
+ .withQueue("testjob");
+
+static BaseJob failJob =
+ BaseJob(TEST_SERVER_URL, "/.well-known/jfasjewfn/awejioaewjgjaad/fawre", BaseJob::Get{}, "AnotherTestJob")
+ .withQueue("testjob");
+
+static BaseJob failJobWithCancel =
+ failJob.withQueue("testjob", CancelFutureIfFailed);
+
+TEST_CASE("Job queue should behave properly", "[kazvjob]")
+{
+ std::vector<bool> v;
+
+ std::function<void(Response)> callback =
+ [&v](Response r) {
+ std::cout << "callback called" << std::endl;
+ v.push_back(r.success());
+ };
+
+ boost::asio::io_context ioContext;
+ CprJobHandler h(ioContext.get_executor());
+
+ h.submit(succJob, callback); // true
+ h.submit(failJob, callback); // false
+ h.submit(failJobWithCancel, callback); // false
+ h.submit(succJob, callback); // false (because this is cancelled)
+ h.submit(succJob, [&h](Response r) {
+ h.stop();
+ }); // nothing added
+
+ ioContext.run();
+
+ REQUIRE( v.size() == 4 );
+ REQUIRE( v[0] == true );
+ REQUIRE( v[1] == false );
+ REQUIRE( v[2] == false );
+ REQUIRE( v[3] == false );
+}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 19, 10:59 AM (3 h, 33 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
55115
Default Alt Text
(48 KB)
Attached To
Mode
rL libkazv
Attached
Detach File
Event Timeline
Log In to Comment