1 #include "hmbdc/Copyright.hpp"
4 #include "hmbdc/comm/Uuid.hpp"
5 #include "hmbdc/Exception.hpp"
12 #include <condition_variable>
14 namespace hmbdc {
namespace app {
16 namespace request_reply_proxy_client_detail {
17 template <
typename CcClient,
typename Message>
20 if (m.typeTag == Message::typeTag
21 && uuid == m.get<Message>().uuid) {
22 c.handleReplyCb(m.get<Message>().reply);
24 auto& wrap =
static_cast<Wrap&
>(m);
33 template <
typename CcClient>
39 c.handleJustBytesReplyCb(m.typeTag, j);
45 template <
typename CcClient,
typename ... Messages>
50 template <
typename CcClient,
typename M,
typename ... Messages>
54 return ReplyDispatcher<CcClient, std::tuple<Messages ...>>()(c, e, uuid);
61 std::function<void()> send;
62 bool proxyReady =
false;
63 bool allRepliesReceived =
false;
65 std::condition_variable cond;
68 template <
typename CcClient>
70 using Interests =
typename CcClient::Replies;
75 INTERESTS_SIZE = std::tuple_size<Interests>::value,
77 char const* hmbdcName()
const {
return nullptr; }
78 std::tuple<char const*, int> schedSpec()
const {
79 return std::make_tuple<char const*, int>(
nullptr, 0);
81 size_t maxBatchMessageCount()
const {
return std::numeric_limits<size_t>::max();}
82 void messageDispatchingStartedCb(uint16_t threadSerialNumber) {
85 void stoppedCb(std::exception
const& e) {
87 bool droppedCb() {
delete this;
return true;};
91 : syncData(std::make_shared<SyncData>())
92 , uuid(comm::Uuid::generate())
96 std::shared_ptr<SyncData> syncData;
98 void invokedCb(uint16_t threadId) {
99 if (syncData.use_count() == 1) {
104 template <
typename Iterator>
105 void handleRangeImpl(Iterator it,
106 Iterator end, uint16_t threadId) {
108 for (;hmbdc_likely(it != end); ++it) {
110 c_, *static_cast<MessageHead*>(*it), uuid);
113 std::unique_lock<std::mutex> lk(syncData->mux);
114 syncData->allRepliesReceived =
true;
115 syncData->cond.notify_one();
127 template <
typename ReplyT>
128 bool operator()(ReplyT&& t) {
return t.uuid == *uuid;}
137 template <RequestC RequestT,
typename Sender,
typename ReplyContext,
typename ReplyHandler
143 , ReplyHandler& replyHandler
145 static_assert(ReplyContext::cpa::has_pool && !ReplyContext::cpa::pool_msgless
146 ,
"Contex needs to have a pool");
147 using namespace request_reply_proxy_client_detail;
148 auto proxy =
new RequestReplyProxyClient<ReplyHandler>(replyHandler);
149 auto syncData = proxy->syncData;
150 request.uuid = proxy->uuid;
151 syncData->send = [&sender, &request]() {
152 sender.send(std::forward<RequestT>(request));
154 ctx.addToPool(*proxy);
155 std::unique_lock<std::mutex> lk(syncData->mux);
156 return syncData->cond.wait_for(lk, std::chrono::microseconds(timeout.microseconds())
157 , [syncData]() {
return syncData->allRepliesReceived;});
165 template <
typename CcSender>
197 template <RequestC RequestT,
typename ReplyContext,
typename ReplyHandler>
201 , ReplyHandler& replyHandler
203 static_assert(ReplyContext::cpa::has_pool && !ReplyContext::cpa::pool_msgless
204 ,
"not supported - needs to have a pool");
206 , *static_cast<CcSender*>(
this)
228 template <MessageC M>
231 (
static_cast<CcSender*
>(
this))->send(std::forward<
REPLY<M>>(reply));
249 template <MessageC M>
252 static_cast<CcSender*
>(
this)->send(reply);
bool requestReply(RequestT &&request, Sender &sender, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout)
not an interface exposed to end user
Definition: RequestReply.hpp:140
template that convert a regular message to be a reply used in request / reply sync messaging - see Re...
Definition: Message.hpp:363
void sendReply(REPLY< M > &&reply)
part of synchronous request reply interface. Send a constructed REPLY using the CcSender, it will only reach to the origianl sender Client. ALL REPLY via a Sender need to sent out using this interface to avoid undefined behavior.
Definition: RequestReply.hpp:230
synchronous request reply interface for the Network Sender
Definition: RequestReply.hpp:166
Definition: RequestReply.hpp:60
void sendReply(REPLY< M > const &reply)
part of synchronous request reply interface. Send a constructed REPLY using the CcSender, it will only reach to the origianl sender Client. ALL REPLY via a Sender need to sent out using this interface to avoid undefined behavior.
Definition: RequestReply.hpp:251
a std tuple holding REPLY messages types it can dispatch
Definition: RequestReply.hpp:125
Definition: Message.hpp:78
A special type of message only used on the receiving side.
Definition: Message.hpp:182
bool requestReply(RequestT &&request, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout=time::Duration::seconds(0xffffffff))
The caller is blocked until the replies to this request are heard and processed. Just like regular Me...
Definition: RequestReply.hpp:199
Definition: RequestReply.hpp:69
Definition: RequestReply.hpp:18
Definition: Message.hpp:112
Definition: RequestReply.hpp:46
Definition: Message.hpp:377
Exception that just has an exit code.
Definition: Exception.hpp:28
a std tuple holding messages types it can dispatch except REPLYs
Definition: BlockingContext.hpp:183
a std tuple holding REQUEST messages types it can dispatch