Page MenuHomePhorge

No OneTemporary

Size
48 KB
Referenced Files
None
Subscribers
None
diff --git a/src/base/basejob.cpp b/src/base/basejob.cpp
index 6ff0d60..3a02972 100644
--- a/src/base/basejob.cpp
+++ b/src/base/basejob.cpp
@@ -1,233 +1,291 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#include <lager/util.hpp>
#include <vector>
#include <tuple>
#include "basejob.hpp"
namespace Kazv
{
BaseJob::Get BaseJob::GET{};
BaseJob::Post BaseJob::POST{};
BaseJob::Put BaseJob::PUT{};
BaseJob::Delete BaseJob::DELETE{};
struct BaseJob::Private
{
Private(std::string serverUrl,
std::string requestUrl,
Method method,
std::string token,
ReturnType returnType,
Body body,
Query query,
Header header,
std::string jobId);
std::string fullRequestUrl;
Method method;
ReturnType returnType;
BytesBody body;
Query query;
Header header;
JsonWrap data;
std::string jobId;
+ std::optional<std::string> queueId;
+ JobQueuePolicy queuePolicy;
};
BaseJob::Private::Private(std::string serverUrl,
std::string requestUrl,
Method method,
std::string token,
ReturnType returnType,
Body body,
Query query,
Header header,
std::string jobId)
: fullRequestUrl(serverUrl + requestUrl)
, method(std::move(method))
, returnType(returnType)
, body()
, query(std::move(query))
, jobId(std::move(jobId))
{
auto header_ = header.get();
if (token.size()) {
header_["Authorization"] = "Bearer " + token;
}
if (isBodyJson(body)) {
JsonBody j = std::get<JsonBody>(std::move(body));
header_["Content-Type"] = "application/json";
this->body = j.get().dump();
} else if (std::holds_alternative<BytesBody>(body)) {
BytesBody b = std::get<BytesBody>(std::move(body));
this->body = b;
}
this->header = header_;
}
BaseJob::BaseJob(std::string serverUrl,
std::string requestUrl,
Method method,
std::string jobId,
std::string token,
ReturnType returnType,
Body body,
Query query,
Header header)
: m_d(std::move(Private(serverUrl, requestUrl, method, token,
returnType, body, query, header, jobId)))
{
}
bool BaseJob::shouldReturnJson() const
{
return m_d->returnType == ReturnType::Json;
};
std::string BaseJob::url() const
{
return m_d->fullRequestUrl;
};
auto BaseJob::requestBody() const -> BytesBody
{
return m_d->body;
}
auto BaseJob::requestHeader() const -> Header
{
return m_d->header;
}
auto BaseJob::returnType() const -> ReturnType
{
return m_d->returnType;
}
auto BaseJob::requestQuery() const -> Query
{
return m_d->query;
}
auto BaseJob::requestMethod() const -> Method
{
return m_d->method;
}
JsonWrap Response::jsonBody() const
{
return std::get<JsonWrap>(body);
}
+ json Response::dataJson(const std::string &key) const
+ {
+ return extraData.get()[key];
+ }
+
+ std::string Response::dataStr(const std::string &key) const
+ {
+ return dataJson(key);
+ }
+
+ std::string Response::jobId() const
+ {
+ return dataStr("-job-id");
+ }
+
+
bool BaseJob::contentTypeMatches(immer::array<std::string> expected, std::string actual)
{
for (const auto &i : expected) {
if (i == "*/*"s) {
return true;
} else {
std::size_t pos = i.find("/*"s);
if (pos != std::string::npos) {
std::string majorType(i.data(), i.data() + pos + 1); // includes `/'
if (actual.find(majorType) == 0) {
return true;
}
} else if (i == actual) {
return true;
}
}
}
return false;
}
Response BaseJob::genResponse(Response r) const
{
auto j = m_d->data.get();
j["-job-id"] = m_d->jobId;
r.extraData = j;
return r;
}
void BaseJob::attachData(JsonWrap j)
{
m_d->data = j;
}
BaseJob BaseJob::withData(JsonWrap j) &&
{
auto ret = BaseJob(std::move(*this));
ret.attachData(j);
return ret;
}
BaseJob BaseJob::withData(JsonWrap j) const &
{
auto ret = BaseJob(*this);
ret.attachData(j);
return ret;
}
+ BaseJob BaseJob::withQueue(std::string id, JobQueuePolicy policy) &&
+ {
+ auto ret = BaseJob(std::move(*this));
+ ret.m_d->queueId = id;
+ ret.m_d->queuePolicy = policy;
+ return ret;
+ }
+
+ BaseJob BaseJob::withQueue(std::string id, JobQueuePolicy policy) const &
+ {
+ auto ret = BaseJob(*this);
+ ret.m_d->queueId = id;
+ ret.m_d->queuePolicy = policy;
+ return ret;
+ }
+
+ json BaseJob::dataJson(const std::string &key) const
+ {
+ return m_d->data.get()[key];
+ }
+
+ std::string BaseJob::dataStr(const std::string &key) const
+ {
+ return dataJson(key);
+ }
+
+ std::string BaseJob::jobId() const
+ {
+ return m_d->jobId;
+ }
+
+ std::optional<std::string> BaseJob::queueId() const
+ {
+ return m_d->queueId;
+ }
+
+ JobQueuePolicy BaseJob::queuePolicy() const
+ {
+ return m_d->queuePolicy;
+ }
std::string Response::errorCode() const
{
// https://matrix.org/docs/spec/client_server/latest#api-standards
if (isBodyJson(body)) {
auto jb = jsonBody();
if (jb.get().contains("errcode")) {
auto code = jb.get()["errcode"].get<std::string>();
if (code != "M_UNKNOWN") {
return code;
}
}
}
return std::to_string(statusCode);
}
std::string Response::errorMessage() const
{
if (isBodyJson(body)) {
auto jb = jsonBody();
if (jb.get().contains("error")) {
auto msg = jb.get()["error"].get<std::string>();
return msg;
}
}
return "";
}
bool operator==(BaseJob a, BaseJob b)
{
return a.m_d->fullRequestUrl == b.m_d->fullRequestUrl
&& a.m_d->method == b.m_d->method
&& a.m_d->returnType == b.m_d->returnType
&& a.m_d->body == b.m_d->body
&& a.m_d->query == b.m_d->query
&& a.m_d->header == b.m_d->header
&& a.m_d->data == b.m_d->data
&& a.m_d->jobId == b.m_d->jobId;
}
bool operator!=(BaseJob a, BaseJob b)
{
return !(a == b);
}
}
diff --git a/src/base/basejob.hpp b/src/base/basejob.hpp
index e120144..f50835b 100644
--- a/src/base/basejob.hpp
+++ b/src/base/basejob.hpp
@@ -1,264 +1,273 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <variant>
#include <tuple>
#include <functional>
#include <string>
#include <map>
#include <future>
#include <immer/map.hpp>
#include <immer/array.hpp>
#include <immer/box.hpp>
#include "types.hpp"
#include "descendent.hpp"
namespace Kazv
{
using Header = immer::box<std::map<std::string, std::string>>;
using BytesBody = Bytes;
using JsonBody = JsonWrap;
struct EmptyBody {};
using Body = std::variant<EmptyBody, JsonBody, BytesBody>;
inline bool operator==(EmptyBody, EmptyBody)
{
return true;
}
inline bool isBodyJson(Body body) {
return std::holds_alternative<JsonBody>(body);
};
+ enum JobQueuePolicy
+ {
+ AlwaysContinue,
+ CancelFutureIfFailed
+ };
+
struct Response {
using StatusCode = int;
StatusCode statusCode;
Body body;
Header header;
JsonWrap extraData;
std::string errorCode() const;
std::string errorMessage() const;
JsonWrap jsonBody() const;
constexpr bool success() const {
return statusCode < 400;
}
- inline json dataJson(const std::string &key) const {
- return extraData.get()[key];
- }
- inline std::string dataStr(const std::string &key) const {
- return dataJson(key);
- }
- inline std::string jobId() const {
- return dataStr("-job-id");
- }
+ json dataJson(const std::string &key) const;
+ std::string dataStr(const std::string &key) const;
+ std::string jobId() const;
};
inline bool operator==(Response a, Response b)
{
return a.statusCode == b.statusCode
&& a.body == b.body
&& a.header == b.header
&& a.extraData == b.extraData;
}
class BaseJob
{
public:
struct Get {};
struct Post {};
struct Put {};
struct Delete {};
using Method = std::variant<Get, Post, Put, Delete>;
static Get GET;
static Post POST;
static Put PUT;
static Delete DELETE;
class Query : public std::vector<std::pair<std::string, std::string>>
{
using BaseT = std::vector<std::pair<std::string, std::string>>;
public:
using BaseT::BaseT;
void add(std::string k, std::string v) {
push_back({k, v});
}
};
using Body = ::Kazv::Body;
using BytesBody = ::Kazv::BytesBody;
using JsonBody = ::Kazv::JsonBody;
using EmptyBody = ::Kazv::EmptyBody;
using Header = ::Kazv::Header;
using Response = ::Kazv::Response;
enum ReturnType {
Json,
Byte,
};
BaseJob(std::string serverUrl,
std::string requestUrl,
Method method,
std::string jobId,
std::string token = {},
ReturnType returnType = ReturnType::Json,
Body body = EmptyBody{},
Query query = {},
Header header = {});
bool shouldReturnJson() const;
std::string url() const;
BytesBody requestBody() const;
Header requestHeader() const;
ReturnType returnType() const;
/// returns the non-encoded query as an array of pairs
Query requestQuery() const;
Method requestMethod() const;
static bool contentTypeMatches(immer::array<std::string> expected, std::string actual);
Response genResponse(Response r) const;
BaseJob withData(JsonWrap j) &&;
BaseJob withData(JsonWrap j) const &;
+ BaseJob withQueue(std::string id, JobQueuePolicy policy = AlwaysContinue) &&;
+ BaseJob withQueue(std::string id, JobQueuePolicy policy = AlwaysContinue) const &;
+
+ json dataJson(const std::string &key) const;
+ std::string dataStr(const std::string &key) const;
+ std::string jobId() const;
+ std::optional<std::string> queueId() const;
+ JobQueuePolicy queuePolicy() const;
+
protected:
void attachData(JsonWrap data);
private:
friend bool operator==(BaseJob a, BaseJob b);
struct Private;
Descendent<Private> m_d;
};
bool operator==(BaseJob a, BaseJob b);
bool operator!=(BaseJob a, BaseJob b);
inline bool operator==(BaseJob::Get, BaseJob::Get) { return true; }
inline bool operator==(BaseJob::Post, BaseJob::Post) { return true; }
inline bool operator==(BaseJob::Put, BaseJob::Put) { return true; }
inline bool operator==(BaseJob::Delete, BaseJob::Delete) { return true; }
namespace detail
{
template<class T>
struct AddToQueryT
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
q.add(name, std::to_string(std::forward<U>(arg)));
}
};
template<>
struct AddToQueryT<std::string>
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
q.add(name, std::forward<U>(arg));
}
};
template<>
struct AddToQueryT<bool>
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
q.add(name, std::forward<U>(arg) ? "true"s : "false"s);
}
};
template<class T>
struct AddToQueryT<immer::array<T>>
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
for (auto v : std::forward<U>(arg)) {
q.add(name, v);
}
}
};
template<>
struct AddToQueryT<json>
{
// https://github.com/nlohmann/json/issues/2040
static void call(BaseJob::Query &q, std::string /* name */, const json &arg) {
// assume v is string type
for (auto [k, v] : arg.items()) {
q.add(k, v);
}
}
};
}
template<class T>
inline void addToQuery(BaseJob::Query &q, std::string name, T &&arg)
{
detail::AddToQueryT<std::decay_t<T>>::call(q, name, std::forward<T>(arg));
}
namespace detail
{
template<class T>
struct AddToQueryIfNeededT
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
if constexpr (detail::hasEmptyMethod(arg)) {
if (! arg.empty()) {
addToQuery(q, name, std::forward<U>(arg));
}
} else {
addToQuery(q, name, std::forward<U>(arg));
}
}
};
template<class T>
struct AddToQueryIfNeededT<std::optional<T>>
{
template<class U>
static void call(BaseJob::Query &q, std::string name, U &&arg) {
if (arg.has_value()) {
addToQuery(q, name, std::forward<U>(arg).value());
}
}
};
}
template<class T>
inline void addToQueryIfNeeded(BaseJob::Query &q, std::string name, T &&arg)
{
detail::AddToQueryIfNeededT<std::decay_t<T>>::call(q, name, std::forward<T>(arg));
}
}
diff --git a/src/base/jobinterface.hpp b/src/base/jobinterface.hpp
index 63cf588..b4da446 100644
--- a/src/base/jobinterface.hpp
+++ b/src/base/jobinterface.hpp
@@ -1,39 +1,53 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <functional>
#include "basejob.hpp"
namespace Kazv
{
+ inline const std::string headFailureCancelledErrorCode{"MOE_KAZV_MXC_HEAD_FAILURE_CANCELLED"};
+ inline const std::string headFailureCancelledErrorMsg{"This job is cancelled because one before it failed."};
+ inline const int headFailureCancelledStatusCode{490};
+
struct JobInterface
{
virtual ~JobInterface() = default;
virtual void async(std::function<void()> func) = 0;
- virtual void setTimeout(std::function<void()> func, int ms) = 0;
- virtual void setInterval(std::function<void()> func, int ms) = 0;
+ virtual void setTimeout(std::function<void()> func, int ms,
+ std::optional<std::string> timerId = std::nullopt) = 0;
+ virtual void setInterval(std::function<void()> func, int ms,
+ std::optional<std::string> timerId = std::nullopt) = 0;
+
+ /// cancels all pending timers with timerId
+ virtual void cancel(std::string timerId) = 0;
+
/// makes an async fetch.
- /// callback will not block the current thread.
+ /// callback should not block the current thread.
+ /// if job has a `queueId`, it must be queued and executed only after
+ /// every job before it in the queue `queueId` returned.
+ /// if job has a `queuePolicy` of CancelFutureIfFailed, the failure of
+ /// this job must remove every other job after it in the queue `queueId`.
virtual void submit(BaseJob job, std::function<void(Response)> callback) = 0;
};
}
diff --git a/src/client/client-model.hpp b/src/client/client-model.hpp
index 7a923f9..59b3c01 100644
--- a/src/client/client-model.hpp
+++ b/src/client/client-model.hpp
@@ -1,300 +1,308 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <tuple>
#include <variant>
#include <string>
#include <optional>
#include <lager/context.hpp>
#include <boost/hana.hpp>
#ifndef NDEBUG
#include <lager/debug/cereal/struct.hpp>
#endif
#include <csapi/sync.hpp>
#include <jobinterface.hpp>
#include <eventinterface.hpp>
#include "clientfwd.hpp"
#include "error.hpp"
#include "room/room-model.hpp"
namespace Kazv
{
inline const std::string DEFTXNID{"0"};
enum RoomVisibility
{
Private,
Public,
};
enum CreateRoomPreset
{
PrivateChat,
PublicChat,
TrustedPrivateChat,
};
struct ClientModel
{
std::string serverUrl;
std::string userId;
std::string token;
std::string deviceId;
bool loggedIn;
Error error;
std::string syncToken;
RoomListModel roomList;
immer::map<std::string /* sender */, Event> presence;
immer::map<std::string /* type */, Event> accountData;
std::string nextTxnId{DEFTXNID};
immer::flex_vector<BaseJob> nextJobs;
immer::flex_vector<KazvEvent> nextTriggers;
// helpers
template<class Job>
struct MakeJobT
{
template<class ...Args>
constexpr auto make(Args &&...args) const {
if constexpr (Job::needsAuth()) {
return Job(
serverUrl,
token,
std::forward<Args>(args)...);
} else {
return Job(
serverUrl,
std::forward<Args>(args)...);
}
}
std::string serverUrl;
std::string token;
};
template<class Job>
constexpr auto job() const {
return MakeJobT<Job>{serverUrl, token};
}
inline void addJob(BaseJob j) {
nextJobs = std::move(nextJobs).push_back(std::move(j));
}
inline auto popAllJobs() {
auto jobs = std::move(nextJobs);
nextJobs = DEFVAL;
return jobs;
};
inline void addTrigger(KazvEvent t) {
addTriggers({t});
}
inline void addTriggers(immer::flex_vector<KazvEvent> c) {
nextTriggers = std::move(nextTriggers) + c;
}
inline auto popAllTriggers() {
auto triggers = std::move(nextTriggers);
nextTriggers = DEFVAL;
return triggers;
}
using Action = ClientAction;
using Effect = ClientEffect;
using Result = ClientResult;
static Result update(ClientModel m, Action a);
};
// actions:
struct LoginAction {
std::string serverUrl;
std::string username;
std::string password;
std::optional<std::string> deviceName;
};
struct TokenLoginAction
{
std::string serverUrl;
std::string username;
std::string token;
std::string deviceId;
};
struct LogoutAction {};
struct SyncAction {};
struct PaginateTimelineAction
{
std::string roomId;
std::optional<int> limit;
};
struct SendMessageAction
{
std::string roomId;
Event event;
};
struct SendStateEventAction
{
std::string roomId;
Event event;
};
struct CreateRoomAction
{
using Visibility = RoomVisibility;
using Preset = CreateRoomPreset;
Visibility visibility;
std::optional<std::string> roomAliasName;
std::optional<std::string> name;
std::optional<std::string> topic;
immer::array<std::string> invite;
//immer::array<Invite3pid> invite3pid;
std::optional<std::string> roomVersion;
JsonWrap creationContent;
immer::array<Event> initialState;
std::optional<Preset> preset;
std::optional<bool> isDirect;
JsonWrap powerLevelContentOverride;
};
struct GetRoomStatesAction
{
std::string roomId;
};
struct InviteToRoomAction
{
std::string roomId;
std::string userId;
};
struct JoinRoomByIdAction
{
std::string roomId;
};
struct JoinRoomAction
{
std::string roomIdOrAlias;
immer::array<std::string> serverName;
};
struct LeaveRoomAction
{
std::string roomId;
};
struct ForgetRoomAction
{
std::string roomId;
};
struct SetTypingAction
{
std::string roomId;
bool typing;
std::optional<int> timeoutMs;
};
struct PostReceiptAction
{
std::string roomId;
std::string eventId;
};
struct SetReadMarkerAction
{
std::string roomId;
std::string eventId;
};
+ struct UploadContentAction
+ {
+ Bytes content;
+ std::optional<std::string> filename;
+ std::optional<std::string> contentType;
+ std::string uploadId; // to be used by library users
+ };
+
struct ProcessResponseAction
{
Response response;
};
inline bool operator==(ClientModel a, ClientModel b)
{
return a.serverUrl == b.serverUrl
&& a.userId == b.userId
&& a.token == b.token
&& a.deviceId == b.deviceId
&& a.loggedIn == b.loggedIn
&& a.error == b.error
&& a.syncToken == b.syncToken
&& a.roomList == b.roomList
&& a.presence == b.presence
&& a.accountData == b.accountData
&& a.nextTxnId == b.nextTxnId
&& a.nextJobs == b.nextJobs
&& a.nextTriggers == b.nextTriggers;
}
#ifndef NDEBUG
LAGER_CEREAL_STRUCT(LoginAction);
LAGER_CEREAL_STRUCT(TokenLoginAction);
LAGER_CEREAL_STRUCT(LogoutAction);
LAGER_CEREAL_STRUCT(SyncAction);
LAGER_CEREAL_STRUCT(PaginateTimelineAction);
LAGER_CEREAL_STRUCT(SendMessageAction);
LAGER_CEREAL_STRUCT(SendStateEventAction);
LAGER_CEREAL_STRUCT(CreateRoomAction);
LAGER_CEREAL_STRUCT(GetRoomStatesAction);
LAGER_CEREAL_STRUCT(InviteToRoomAction);
LAGER_CEREAL_STRUCT(JoinRoomByIdAction);
LAGER_CEREAL_STRUCT(JoinRoomAction);
LAGER_CEREAL_STRUCT(LeaveRoomAction);
LAGER_CEREAL_STRUCT(ForgetRoomAction);
LAGER_CEREAL_STRUCT(SetTypingAction);
LAGER_CEREAL_STRUCT(PostReceiptAction);
LAGER_CEREAL_STRUCT(ProcessResponseAction);
LAGER_CEREAL_STRUCT(SetReadMarkerAction);
#endif
template<class Archive>
void serialize(Archive &ar, ClientModel &m, std::uint32_t const /*version*/)
{
ar(m.serverUrl, m.userId, m.token, m.deviceId, m.loggedIn,
m.error,
m.syncToken,
m.roomList,
m.presence,
m.accountData,
m.nextTxnId);
}
}
CEREAL_CLASS_VERSION(Kazv::ClientModel, 0);
diff --git a/src/job/cprjobhandler.cpp b/src/job/cprjobhandler.cpp
index 9f5afba..ae68bfb 100644
--- a/src/job/cprjobhandler.cpp
+++ b/src/job/cprjobhandler.cpp
@@ -1,125 +1,327 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
+#include <algorithm>
+#include <deque>
+
+#include <zug/into_vector.hpp>
+#include <zug/transducer/map.hpp>
#include <cpr/cpr.h>
#include "cprjobhandler.hpp"
#include "debug.hpp"
namespace Kazv
{
+ struct CprJobHandler::Private
+ {
+ CprJobHandler *q;
+ boost::asio::io_context::executor_type executor;
+ using TimerSP = std::shared_ptr<boost::asio::steady_timer>;
+ using TimerSPList = std::vector<TimerSP>;
+ using TimerMap = std::unordered_map<std::optional<std::string>, TimerSPList>;
+ TimerMap timerMap;
+
+ enum Status
+ {
+ Waiting,
+ Running,
+ };
+ using Callback = std::function<void(Response)>;
+ struct JobDesc
+ {
+ BaseJob job;
+ Callback callback;
+ Status status;
+ };
+ using JobQueue = std::deque<JobDesc>;
+ using JobMap = std::unordered_map<std::string, JobQueue>;
+ JobMap jobQueues;
+
+ void submitImpl(BaseJob job, std::function<void(Response)> userCallback);
+
+ void addToQueue(BaseJob job, Callback callback) {
+ boost::asio::post(
+ executor,
+ [=] {
+ // precondition: job has a queueId
+ auto queueId = job.queueId().value();
+
+ jobQueues[queueId].push_back(JobDesc{job, callback, Waiting});
+ });
+ }
+
+ void clearQueue(std::string queueId) {
+ boost::asio::post(
+ executor,
+ [=] { clearQueueImpl(queueId); });
+ }
+
+ void clearQueueImpl(std::string queueId) {
+ Response fakeResponse;
+ fakeResponse.statusCode = headFailureCancelledStatusCode;
+ fakeResponse.body = JsonBody(
+ json{ {"errcode", headFailureCancelledErrorCode},
+ {"error", headFailureCancelledErrorMsg} }
+ );
+ dbgJob << "clearQueueImpl called with " << queueId << std::endl;
+
+ for (auto [job, callback, status] : jobQueues[queueId]) {
+ dbgJob << "this job is " << (status == Waiting ? "Waiting" : "Running") << std::endl;
+ if (status == Waiting) {
+ // Run callback in a different thread, just as in submitImpl().
+ q->async([=] { callback(job.genResponse(fakeResponse)); } );
+ }
+ // if status is Running, the callback is already called
+ }
+ jobQueues[queueId].clear();
+ }
+
+ void popJob(std::string queueId) {
+ boost::asio::post(
+ executor,
+ [=] { popJobImpl(queueId); });
+ }
+
+ void popJobImpl(std::string queueId) {
+ dbgJob << "popJobImpl called with " << queueId << std::endl;
+ if (! jobQueues[queueId].empty()) {
+ dbgJob << "the first job is "
+ << (jobQueues[queueId].front().status == Waiting ? "Waiting" : "Running") << std::endl;
+ jobQueues[queueId].pop_front();
+ }
+ }
+
+ void monitorQueues() {
+ boost::asio::post(
+ executor,
+ [=] {
+ // precondition: job has a queueId
+ for (auto &[queueId, queue] : jobQueues) { // need to change queue
+ dbgJob << "monitoring queue " << queueId << std::endl;
+ if (! queue.empty()) {
+ auto [job, callback, status] = queue.front();
+ dbgJob << "the first job is "
+ << (status == Waiting ? "Waiting" : "Running") << std::endl;
+ if (status == Waiting) {
+ queue.front().status = Running;
+ submitImpl(
+ job,
+ [=](Response r) { // in new thread
+ callback(r);
+
+ if (! r.success() // should be enough for now
+ && job.queuePolicy() == CancelFutureIfFailed) {
+ clearQueue(queueId); // in executor thread
+ } else {
+ popJob(queueId); // in executor thread
+ }
+ });
+ }
+ }
+ }
+ });
+ }
+
+ void addTimerToMap(TimerSP timer, std::optional<std::string> timerId) {
+ boost::asio::post(
+ executor,
+ [=] {
+ timerMap[timerId].push_back(timer);
+ });
+ }
+
+ void clearTimer(TimerSP timer, std::optional<std::string> timerId) {
+ boost::asio::post(
+ executor,
+ [=] {
+ std::remove(timerMap[timerId].begin(), timerMap[timerId].end(), timer);
+ });
+ }
+
+ void cancelAllTimers(std::optional<std::string> timerId) {
+ boost::asio::post(
+ executor,
+ [=] {
+ cancelAllTimersImpl(timerId);
+ });
+ }
+
+ void cancelAllTimersImpl(std::optional<std::string> timerId) {
+ auto timers = timerMap[timerId];
+ timerMap.erase(timerId);
+ for (auto timer : timers) {
+ timer->cancel();
+ }
+ }
+
+ void intervalTimerCallback(TimerSP timer,
+ std::function<void()> func,
+ int ms,
+ const boost::system::error_code &error) {
+ if (! error) {
+ func();
+ auto dur = boost::asio::chrono::milliseconds(ms);
+ timer->expires_at(timer->expiry() + dur);
+ timer->async_wait(
+ [=](const boost::system::error_code &error) {
+ intervalTimerCallback(timer, func, ms, error);
+ });
+ }
+ }
+ };
+
CprJobHandler::CprJobHandler(boost::asio::io_context::executor_type executor)
- : executor(std::move(executor))
+ : m_d(Private{this, std::move(executor), Private::TimerMap{}, Private::JobMap{}})
{
+ setInterval(
+ [=] {
+ m_d->monitorQueues();
+ },
+ 50, // ms
+ "-queue-monitor");
}
CprJobHandler::~CprJobHandler() = default;
void CprJobHandler::async(std::function<void()> func)
{
- std::thread([func=std::move(func), guard=boost::asio::executor_work_guard(executor)]() {
+ std::thread([func=std::move(func), guard=boost::asio::executor_work_guard(m_d->executor)]() {
func();
}).detach();
}
- void CprJobHandler::setTimeout(std::function<void()> func, int ms)
+ void CprJobHandler::setTimeout(std::function<void()> func, int ms, std::optional<std::string> timerId)
{
auto timer=std::make_shared<boost::asio::steady_timer>(
- executor, boost::asio::chrono::milliseconds(ms));
+ m_d->executor, boost::asio::chrono::milliseconds(ms));
+
+ m_d->addTimerToMap(timer, timerId);
+
timer->async_wait(
[=, timer=timer](const boost::system::error_code &error){
if (! error) {
func();
+ this->m_d->clearTimer(timer, timerId);
}
});
}
- void CprJobHandler::setInterval(std::function<void()> func, int ms)
+ void CprJobHandler::setInterval(std::function<void()> func, int ms, std::optional<std::string> timerId)
{
auto dur = boost::asio::chrono::milliseconds(ms);
- auto timer = std::make_shared<boost::asio::steady_timer>(executor, dur);
+ auto timer = std::make_shared<boost::asio::steady_timer>(m_d->executor, dur);
+
+ m_d->addTimerToMap(timer, timerId);
+
timer->async_wait(
- [=, timer=timer](const boost::system::error_code &error) {
- if (!error) {
- func();
- timer->expires_at(timer->expiry() + dur);
- }
+ [=](const boost::system::error_code &error) {
+ m_d->intervalTimerCallback(timer, func, ms, error);
});
}
+ void CprJobHandler::cancel(std::string timerId)
+ {
+ m_d->cancelAllTimers(timerId);
+ }
+
void CprJobHandler::submit(BaseJob job, std::function<void(Response)> userCallback)
+ {
+ if (job.queueId()) {
+ m_d->addToQueue(job, userCallback);
+ } else {
+ m_d->submitImpl(job, userCallback);
+ }
+ }
+
+ void CprJobHandler::Private::submitImpl(BaseJob job, std::function<void(Response)> userCallback)
{
cpr::Url url{job.url()};
cpr::Body body(job.requestBody());
BaseJob::Header origHeader = job.requestHeader();
cpr::Header header(origHeader.get().begin(), origHeader.get().end());
cpr::Parameters params;
BaseJob::Query query = job.requestQuery();
BaseJob::ReturnType returnType = job.returnType();
BaseJob::Method method = job.requestMethod();
if (! query.empty()) {
// from cpr/parameters.cpp
cpr::CurlHolder holder;
for (const auto kv : query) {
std::string key = kv.first;
std::string value = kv.second;
params.AddParameter(cpr::Parameter(std::move(key), std::move(value)), holder);
}
}
auto callback = [returnType](cpr::Response r) -> Response {
Body body = r.text;
if (returnType == BaseJob::ReturnType::Json) {
try {
body = BaseJob::JsonBody(std::move(json::parse(r.text)));
} catch (const json::exception &e) {
// the response is not valid json
dbgJob << "body is not correct json: " << e.what() << std::endl;
}
}
return { r.status_code,
body,
BaseJob::Header(r.header.begin(), r.header.end()),
{} // extraData, will be added in genResponse
};
};
std::shared_future<Response> res = std::visit(lager::visitor{
[=](BaseJob::Get) {
return cpr::GetCallback(callback, url, header, body, params);
},
[=](BaseJob::Post) {
return cpr::PostCallback(callback, url, header, body, params);
},
[=](BaseJob::Put) {
return cpr::PutCallback(callback, url, header, body, params);
},
[=](BaseJob::Delete) {
return cpr::DeleteCallback(callback, url, header, body, params);
}
}, method).share();
- async([=]() {
- userCallback(job.genResponse(res.get()));
- });
+ q->async([=]() {
+ userCallback(job.genResponse(res.get()));
+ });
+ }
+
+ void CprJobHandler::stop()
+ {
+ boost::asio::post(
+ m_d->executor,
+ [=] {
+ auto ids = zug::into_vector(
+ zug::map([](auto i) { return i.first; }),
+ m_d->timerMap);
+ for (auto id : ids) {
+ m_d->cancelAllTimersImpl(id);
+ }
+ m_d->jobQueues.clear();
+ });
}
}
diff --git a/src/job/cprjobhandler.hpp b/src/job/cprjobhandler.hpp
index 9cd12bb..80c4e2f 100644
--- a/src/job/cprjobhandler.hpp
+++ b/src/job/cprjobhandler.hpp
@@ -1,40 +1,48 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <boost/asio.hpp>
#include "jobinterface.hpp"
+#include "descendent.hpp"
namespace Kazv
{
struct CprJobHandler : public JobInterface
{
CprJobHandler(boost::asio::io_context::executor_type executor);
~CprJobHandler() override;
void async(std::function<void()> func) override;
- void setTimeout(std::function<void()> func, int ms) override;
- void setInterval(std::function<void()> func, int ms) override;
+ void setTimeout(std::function<void()> func, int ms,
+ std::optional<std::string> timerId = std::nullopt) override;
+ void setInterval(std::function<void()> func, int ms,
+ std::optional<std::string> timerId = std::nullopt) override;
+ void cancel(std::string timerId) override;
+
void submit(BaseJob job,
std::function<void(Response)> callback) override;
+
+ void stop();
private:
- boost::asio::io_context::executor_type executor;
+ struct Private;
+ Descendent<Private> m_d;
};
}
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index e7253ec..e9c4257 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -1,20 +1,21 @@
include(CTest)
add_executable(kazvtest
testmain.cpp
basejobtest.cpp
cursorutiltest.cpp
client/client-test-util.cpp
client/sync-test.cpp
+ kazvjobtest.cpp
)
target_link_libraries(kazvtest
PRIVATE Catch2::Catch2
PRIVATE kazv
PRIVATE kazvjob
PRIVATE nlohmann_json::nlohmann_json
PRIVATE immer
PRIVATE lager
PRIVATE zug)
diff --git a/src/tests/basejobtest.cpp b/src/tests/basejobtest.cpp
index 10553a0..cfab429 100644
--- a/src/tests/basejobtest.cpp
+++ b/src/tests/basejobtest.cpp
@@ -1,53 +1,54 @@
/*
* Copyright (C) 2020 Tusooa Zhu
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/
#include <catch2/catch.hpp>
#include <iostream>
#include <future>
#include <basejob.hpp>
#include <cprjobhandler.hpp>
#include "tests.hpp"
using namespace Kazv;
TEST_CASE("Base job should fetch correctly", "[basejob]")
{
boost::asio::io_context ioContext;
BaseJob job(TEST_SERVER_URL, "/.well-known/matrix/client", BaseJob::Get{}, "TestJob");
CprJobHandler h(ioContext.get_executor());
h.submit(
job.withData(json{{"test", "bar"}}),
- [](auto r) {
+ [&h](auto r) {
if (r.statusCode == 200) {
REQUIRE( isBodyJson(r.body) );
json j = r.jsonBody().get();
REQUIRE_NOTHROW( (j["m.homeserver"]["base_url"]) );
REQUIRE( (j["m.homeserver"]["base_url"].size() > 0) );
REQUIRE( r.dataStr("test") == "bar" );
}
+ h.stop();
});
ioContext.run();
}
diff --git a/src/tests/kazvjobtest.cpp b/src/tests/kazvjobtest.cpp
new file mode 100644
index 0000000..b9f0e3e
--- /dev/null
+++ b/src/tests/kazvjobtest.cpp
@@ -0,0 +1,147 @@
+/*
+ * Copyright (C) 2020 Tusooa Zhu
+ *
+ * This file is part of libkazv.
+ *
+ * libkazv is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * libkazv is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with libkazv. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include <vector>
+#include <iostream>
+
+#include <catch2/catch.hpp>
+#include <cprjobhandler.hpp>
+
+#include "tests.hpp"
+
+using namespace Kazv;
+
+TEST_CASE("setTimeout should behave properly", "[kazvjob]")
+{
+ boost::asio::io_context ioContext;
+ CprJobHandler h(ioContext.get_executor());
+
+ std::vector<int> v;
+
+ h.setTimeout(
+ [&v, &h] {
+ v.push_back(500);
+ h.stop();
+ }, 500);
+
+ h.setTimeout(
+ [&v] {
+ v.push_back(100);
+ }, 100);
+
+ ioContext.run();
+
+ REQUIRE( v.size() == 2 );
+ REQUIRE( v[0] == 100 );
+ REQUIRE( v[1] == 500 );
+}
+
+TEST_CASE("setInterval should behave properly", "[kazvjob]")
+{
+ boost::asio::io_context ioContext;
+ CprJobHandler h(ioContext.get_executor());
+
+ std::vector<int> v;
+
+ h.setInterval(
+ [&v, &h] {
+ v.push_back(50);
+ std::cout << "timer executed" << std::endl;
+ if (v.size() >= 2) {
+ h.stop();
+ }
+ }, 100);
+
+ ioContext.run();
+
+ REQUIRE( v.size() == 2 );
+ REQUIRE( v[0] == 50 );
+ REQUIRE( v[1] == 50 );
+}
+
+TEST_CASE("setInterval can be cancelled", "[kazvjob]")
+{
+ boost::asio::io_context ioContext;
+ CprJobHandler h(ioContext.get_executor());
+
+ std::vector<int> v;
+
+ std::string id{"testTimerId"};
+
+ h.setInterval(
+ [&v, &h, id] {
+ v.push_back(50);
+ std::cout << "timer executed" << std::endl;
+ if (v.size() >= 2) {
+ h.cancel(id);
+ }
+ }, 100, id);
+
+ h.setTimeout(
+ [&h] {
+ h.stop();
+ }, 300);
+
+ ioContext.run();
+
+ REQUIRE( v.size() == 2 );
+ REQUIRE( v[0] == 50 );
+ REQUIRE( v[1] == 50 );
+}
+
+static BaseJob succJob =
+ BaseJob(TEST_SERVER_URL, "/.well-known/matrix/client", BaseJob::Get{}, "TestJob")
+ .withQueue("testjob");
+
+static BaseJob failJob =
+ BaseJob(TEST_SERVER_URL, "/.well-known/jfasjewfn/awejioaewjgjaad/fawre", BaseJob::Get{}, "AnotherTestJob")
+ .withQueue("testjob");
+
+static BaseJob failJobWithCancel =
+ failJob.withQueue("testjob", CancelFutureIfFailed);
+
+TEST_CASE("Job queue should behave properly", "[kazvjob]")
+{
+ std::vector<bool> v;
+
+ std::function<void(Response)> callback =
+ [&v](Response r) {
+ std::cout << "callback called" << std::endl;
+ v.push_back(r.success());
+ };
+
+ boost::asio::io_context ioContext;
+ CprJobHandler h(ioContext.get_executor());
+
+ h.submit(succJob, callback); // true
+ h.submit(failJob, callback); // false
+ h.submit(failJobWithCancel, callback); // false
+ h.submit(succJob, callback); // false (because this is cancelled)
+ h.submit(succJob, [&h](Response r) {
+ h.stop();
+ }); // nothing added
+
+ ioContext.run();
+
+ REQUIRE( v.size() == 4 );
+ REQUIRE( v[0] == true );
+ REQUIRE( v[1] == false );
+ REQUIRE( v[2] == false );
+ REQUIRE( v[3] == false );
+}

File Metadata

Mime Type
text/x-diff
Expires
Sun, Jan 19, 10:59 AM (58 m, 11 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
55115
Default Alt Text
(48 KB)

Event Timeline