hmbdc
simplify-high-performance-messaging-programming
RequestReply.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/comm/Uuid.hpp"
5 #include "hmbdc/Exception.hpp"
6 
7 
8 #include <functional>
9 #include <mutex>
10 #include <tuple>
11 #include <memory>
12 #include <condition_variable>
13 
14 namespace hmbdc { namespace app {
15 
16 namespace request_reply_proxy_client_detail {
17 template <typename CcClient, typename Message>
18 struct dispatcher {
19  bool operator()(CcClient& c, MessageHead& m, comm::Uuid const& uuid) {
20  if (m.typeTag == Message::typeTag
21  && uuid == m.get<Message>().uuid) {
22  c.handleReplyCb(m.get<Message>().reply);
23  using Wrap = MessageWrap<Message>;
24  auto& wrap = static_cast<Wrap&>(m);
25  wrap.~Wrap();
26  return true;
27  }
28  return false;
29  }
30 };
31 
32 
33 template <typename CcClient>
34 struct dispatcher<CcClient, REPLY<JustBytes>> {
35  bool operator()(CcClient& c, MessageHead& m, comm::Uuid const& uuid) {
36  //only deliver user messages
37  if (m.typeTag > 1000 && uuid == m.get<REPLY<JustBytes>>().uuid) {
38  auto j = &m.get<REPLY<JustBytes>>().reply;
39  c.handleJustBytesReplyCb(m.typeTag, j);
40  }
41  return true;
42  }
43 };
44 
45 template <typename CcClient, typename ... Messages>
47  bool operator()(CcClient&, MessageHead&, comm::Uuid const&) const{return false;}
48 };
49 
50 template <typename CcClient, typename M, typename ... Messages>
51 struct ReplyDispatcher<CcClient, std::tuple<M, Messages...>> {
52  bool operator()(CcClient& c, MessageHead& e, comm::Uuid const& uuid) const {
53  if (!dispatcher<CcClient, M>()(c, e, uuid))
54  return ReplyDispatcher<CcClient, std::tuple<Messages ...>>()(c, e, uuid);
55  else
56  return true;
57  }
58 };
59 
60 struct SyncData {
61  std::function<void()> send;
62  bool proxyReady = false;
63  bool allRepliesReceived = false;
64  std::mutex mux;
65  std::condition_variable cond;
66 };
67 
68 template <typename CcClient>
70  using Interests = typename CcClient::Replies;
71  using Replies = std::tuple<>;
72  using Requests = std::tuple<>;
73 
74  enum {
75  INTERESTS_SIZE = std::tuple_size<Interests>::value,
76  };
77  char const* hmbdcName() const { return nullptr; }
78  std::tuple<char const*, int> schedSpec() const {
79  return std::make_tuple<char const*, int>(nullptr, 0);
80  }
81  size_t maxBatchMessageCount() const {return std::numeric_limits<size_t>::max();}
82  void messageDispatchingStartedCb(uint16_t threadSerialNumber) {
83  syncData->send();
84  }
85  void stoppedCb(std::exception const& e) {
86  }
87  bool droppedCb() {delete this; return true;};
88  virtual ~RequestReplyProxyClient(){}
89 
90  RequestReplyProxyClient(CcClient& c)
91  : syncData(std::make_shared<SyncData>())
92  , uuid(comm::Uuid::generate())
93  , c_(c) {
94  }
95 
96  std::shared_ptr<SyncData> syncData;
97 
98  void invokedCb(size_t) {
99  if (syncData.use_count() == 1) {
100  throw ExitCode(1);
101  }
102  }
103 
104  template <typename Iterator>
105  size_t handleRangeImpl(Iterator it,
106  Iterator end, uint16_t threadId) {
107  try {
108  size_t res = 0;
109  for (;hmbdc_likely(it != end); ++it) {
111  c_, *static_cast<MessageHead*>(*it), uuid)) res++;
112  }
113  return res;
114  } catch (...) {
115  std::unique_lock<std::mutex> lk(syncData->mux);
116  syncData->allRepliesReceived = true;
117  syncData->cond.notify_one();
118  throw;
119  }
120  }
121 
122  const comm::Uuid uuid;
123 private:
124  CcClient& c_;
125 };
126 
128  comm::Uuid const* uuid;
129  template <typename ReplyT>
130  bool operator()(ReplyT&& t) {return t.uuid == *uuid;}
131 
132 };
133 
134 }
135 
136 /**
137  * @brief not an interface exposed to end user
138  */
139 template <RequestC RequestT, typename Sender, typename ReplyContext, typename ReplyHandler
140 >
141 bool
142 requestReply(RequestT &&request
143  , Sender& sender
144  , ReplyContext& ctx
145  , ReplyHandler& replyHandler
146  , time::Duration timeout) {
147  static_assert(ReplyContext::cpa::has_pool && !ReplyContext::cpa::pool_msgless
148  , "Contex needs to have a pool");
149  using namespace request_reply_proxy_client_detail;
150  auto proxy = new RequestReplyProxyClient<ReplyHandler>(replyHandler);
151  auto syncData = proxy->syncData;
152  request.uuid = proxy->uuid;
153  syncData->send = [&sender, &request]() {
154  sender.send(std::forward<RequestT>(request));
155  };
156  ctx.addToPool(*proxy);
157  std::unique_lock<std::mutex> lk(syncData->mux);
158  return syncData->cond.wait_for(lk, std::chrono::microseconds(timeout.microseconds())
159  , [syncData]() {return syncData->allRepliesReceived;});
160 }
161 
162 /**
163  * @brief synchronous request reply interface for the Network Sender
164  *
165  * @tparam CcSender concrete Sender
166  */
167 template <typename CcSender>
169  /**
170  * @brief The caller is blocked until the replies to this request are heard and
171  * processed. Just like regular Messages REQUEST is sent out to Clients through
172  * the CcSender
173  *
174  * @details
175  *
176  * @tparam MessageC Message the type wrapped in REQUEST<> template
177  * @tparam ReplyContext the Context Type that ReplyT messages need to be able
178  * to reach, it needs to have a pool
179  * @snippet blind-auction.cpp nework request reply example
180  * @tparam ReplyHandler type for how to handle the replies.
181  * example of a handler expect two steps replies for requestReply call:
182  * @code
183  * struct ClientAlsoHandlesReplies
184  * : Client<ClientAlsoHandlesReplies, REPLY<Reply0>, REPLY<Reply1>, OtherMsg> {
185  * void handleReplyCb(Reply0 const&){
186  * // got step 1 reply
187  * }
188  * void handleReplyCb(Reply1 const&){
189  * // got step 2 reply
190  * throw hmbdc::ExitCode(0); // all replies received, now
191  * // throw to unblock the requestReply call
192  * }
193  * @endcode
194  *
195  * @param request the REQUEST sent out
196  * @param ctx the replies are expected to show up in this Context which contains a pool
197  * @param replyHandler the callback logic to handle replies
198  */
199  template <RequestC RequestT, typename ReplyContext, typename ReplyHandler>
200  bool
201  requestReply(RequestT&& request
202  , ReplyContext& ctx
203  , ReplyHandler& replyHandler
204  , time::Duration timeout = time::Duration::seconds(0xffffffff)) {
205  static_assert(ReplyContext::cpa::has_pool && !ReplyContext::cpa::pool_msgless
206  , "not supported - needs to have a pool");
207  return hmbdc::app::requestReply(std::forward<RequestT>(request)
208  , *static_cast<CcSender*>(this)
209  , ctx
210  , replyHandler
211  , timeout);
212  }
213 
214 
215  /**
216  * @brief part of synchronous request reply interface. Send a constructed REPLY
217  * using the CcSender, it will only reach to the origianl sender Client.
218  * ALL REPLY via a Sender need to sent out using this interface to avoid
219  * undefined behavior.
220  *
221  * @tparam MessageC Message the type wrapped in REPLY<> template
222  * example usage:
223  * @code
224  * REPLY<Response> resp(req, req.request); //see REPLY<> ctor
225  * myCtx.sendReply(resp);
226  * @endcode
227  *
228  * @param reply (rvalue) the REPLY (previously constructed from a REQUEST) to sent out
229  */
230  template <MessageC M>
231  void
232  sendReply(REPLY<M> && reply) {
233  (static_cast<CcSender*>(this))->send(std::forward<REPLY<M>>(reply));
234  }
235 
236  /**
237  * @brief part of synchronous request reply interface. Send a constructed REPLY
238  * using the CcSender, it will only reach to the origianl sender Client.
239  * ALL REPLY via a Sender need to sent out using this interface to avoid
240  * undefined behavior.
241  *
242  * @tparam MessageC Message the type wrapped in REPLY<> template
243  * example usage:
244  * @code
245  * REPLY<Response> resp(req, req.request); //see REPLY<> ctor
246  * myCtx.sendReply(resp);
247  * @endcode
248  *
249  * @param reply (lvalue) the REPLY (previously constructed from a REQUEST) to sent out
250  */
251  template <MessageC M>
252  void
253  sendReply(REPLY<M> const& reply) {
254  static_cast<CcSender*>(this)->send(reply);
255  }
256 };
257 
258 }}
259 
260 
bool requestReply(RequestT &&request, Sender &sender, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout)
not an interface exposed to end user
Definition: RequestReply.hpp:142
Definition: Topic.hpp:44
template that convert a regular message to be a reply used in request / reply sync messaging - see Re...
Definition: Message.hpp:363
void sendReply(REPLY< M > &&reply)
part of synchronous request reply interface. Send a constructed REPLY using the CcSender, it will only reach to the origianl sender Client. ALL REPLY via a Sender need to sent out using this interface to avoid undefined behavior.
Definition: RequestReply.hpp:232
synchronous request reply interface for the Network Sender
Definition: RequestReply.hpp:168
void sendReply(REPLY< M > const &reply)
part of synchronous request reply interface. Send a constructed REPLY using the CcSender, it will only reach to the origianl sender Client. ALL REPLY via a Sender need to sent out using this interface to avoid undefined behavior.
Definition: RequestReply.hpp:253
a std tuple holding REPLY messages types it can dispatch
Definition: Message.hpp:78
A special type of message only used on the receiving side.
Definition: Message.hpp:182
bool requestReply(RequestT &&request, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout=time::Duration::seconds(0xffffffff))
The caller is blocked until the replies to this request are heard and processed. Just like regular Me...
Definition: RequestReply.hpp:201
Definition: Uuid.hpp:10
Definition: Message.hpp:112
Definition: Time.hpp:133
Definition: Message.hpp:377
Exception that just has an exit code.
Definition: Exception.hpp:28
a std tuple holding messages types it can dispatch except REPLYs
Definition: BlockingContext.hpp:183
Definition: Base.hpp:13
a std tuple holding REQUEST messages types it can dispatch