hmbdc
simplify-high-performance-messaging-programming
RequestReply.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/comm/Uuid.hpp"
5 #include "hmbdc/Exception.hpp"
6 
7 
8 #include <functional>
9 #include <mutex>
10 #include <tuple>
11 #include <memory>
12 #include <condition_variable>
13 
14 namespace hmbdc { namespace app {
15 
16 namespace request_reply_proxy_client_detail {
17 template <typename CcClient, typename Message>
18 struct dispatcher {
19  bool operator()(CcClient& c, MessageHead& m, comm::Uuid const& uuid) {
20  if (m.typeTag == Message::typeTag
21  && uuid == m.get<Message>().uuid) {
22  c.handleReplyCb(m.get<Message>().reply);
23  using Wrap = MessageWrap<Message>;
24  auto& wrap = static_cast<Wrap&>(m);
25  wrap.~Wrap();
26  return true;
27  }
28  return false;
29  }
30 };
31 
32 
33 template <typename CcClient>
34 struct dispatcher<CcClient, REPLY<JustBytes>> {
35  bool operator()(CcClient& c, MessageHead& m, comm::Uuid const& uuid) {
36  //only deliver user messages
37  if (m.typeTag > 1000 && uuid == m.get<REPLY<JustBytes>>().uuid) {
38  auto j = &m.get<REPLY<JustBytes>>().reply;
39  c.handleJustBytesReplyCb(m.typeTag, j);
40  }
41  return true;
42  }
43 };
44 
45 template <typename CcClient, typename ... Messages>
47  bool operator()(CcClient&, MessageHead&, comm::Uuid const&) const{return false;}
48 };
49 
50 template <typename CcClient, typename M, typename ... Messages>
51 struct ReplyDispatcher<CcClient, std::tuple<M, Messages...>> {
52  bool operator()(CcClient& c, MessageHead& e, comm::Uuid const& uuid) const {
53  if (!dispatcher<CcClient, M>()(c, e, uuid))
54  return ReplyDispatcher<CcClient, std::tuple<Messages ...>>()(c, e, uuid);
55  else
56  return true;
57  }
58 };
59 
60 struct SyncData {
61  std::function<void()> send;
62  bool proxyReady = false;
63  bool allRepliesReceived = false;
64  std::mutex mux;
65  std::condition_variable cond;
66 };
67 
68 template <typename CcClient>
70  using Interests = typename CcClient::Replies;
71  using Replies = std::tuple<>;
72  using Requests = std::tuple<>;
73 
74  enum {
75  INTERESTS_SIZE = std::tuple_size<Interests>::value,
76  };
77  char const* hmbdcName() const { return nullptr; }
78  std::tuple<char const*, int> schedSpec() const {
79  return std::make_tuple<char const*, int>(nullptr, 0);
80  }
81  size_t maxBatchMessageCount() const {return std::numeric_limits<size_t>::max();}
82  void messageDispatchingStartedCb(uint16_t threadSerialNumber) {
83  syncData->send();
84  }
85  void stoppedCb(std::exception const& e) {
86  }
87  bool droppedCb() {delete this; return true;};
88  virtual ~RequestReplyProxyClient(){}
89 
90  RequestReplyProxyClient(CcClient& c)
91  : syncData(std::make_shared<SyncData>())
92  , uuid(comm::Uuid::generate())
93  , c_(c) {
94  }
95 
96  std::shared_ptr<SyncData> syncData;
97 
98  void invokedCb(uint16_t threadId) {
99  if (syncData.use_count() == 1) {
100  throw ExitCode(1);
101  }
102  }
103 
104  template <typename Iterator>
105  void handleRangeImpl(Iterator it,
106  Iterator end, uint16_t threadId) {
107  try {
108  for (;hmbdc_likely(it != end); ++it) {
110  c_, *static_cast<MessageHead*>(*it), uuid);
111  }
112  } catch (...) {
113  std::unique_lock<std::mutex> lk(syncData->mux);
114  syncData->allRepliesReceived = true;
115  syncData->cond.notify_one();
116  throw;
117  }
118  }
119 
120  const comm::Uuid uuid;
121 private:
122  CcClient& c_;
123 };
124 
126  comm::Uuid const* uuid;
127  template <typename ReplyT>
128  bool operator()(ReplyT&& t) {return t.uuid == *uuid;}
129 
130 };
131 
132 }
133 
134 /**
135  * @brief not an interface exposed to end user
136  */
137 template <RequestC RequestT, typename Sender, typename ReplyContext, typename ReplyHandler
138 >
139 bool
140 requestReply(RequestT &&request
141  , Sender& sender
142  , ReplyContext& ctx
143  , ReplyHandler& replyHandler
144  , time::Duration timeout) {
145  static_assert(ReplyContext::cpa::has_pool && !ReplyContext::cpa::pool_msgless
146  , "Contex needs to have a pool");
147  using namespace request_reply_proxy_client_detail;
148  auto proxy = new RequestReplyProxyClient<ReplyHandler>(replyHandler);
149  auto syncData = proxy->syncData;
150  request.uuid = proxy->uuid;
151  syncData->send = [&sender, &request]() {
152  sender.send(std::forward<RequestT>(request));
153  };
154  ctx.addToPool(*proxy);
155  std::unique_lock<std::mutex> lk(syncData->mux);
156  return syncData->cond.wait_for(lk, std::chrono::microseconds(timeout.microseconds())
157  , [syncData]() {return syncData->allRepliesReceived;});
158 }
159 
160 /**
161  * @brief synchronous request reply interface for the Network Sender
162  *
163  * @tparam CcSender concrete Sender
164  */
165 template <typename CcSender>
167  /**
168  * @brief The caller is blocked until the replies to this request are heard and
169  * processed. Just like regular Messages REQUEST is sent out to Clients through
170  * the CcSender
171  *
172  * @details
173  *
174  * @tparam MessageC Message the type wrapped in REQUEST<> template
175  * @tparam ReplyContext the Context Type that ReplyT messages need to be able
176  * to reach, it needs to have a pool
177  * @snippet blind-auction.cpp nework request reply example
178  * @tparam ReplyHandler type for how to handle the replies.
179  * example of a handler expect two steps replies for requestReply call:
180  * @code
181  * struct ClientAlsoHandlesReplies
182  * : Client<ClientAlsoHandlesReplies, REPLY<Reply0>, REPLY<Reply1>, OtherMsg> {
183  * void handleReplyCb(Reply0 const&){
184  * // got step 1 reply
185  * }
186  * void handleReplyCb(Reply1 const&){
187  * // got step 2 reply
188  * throw hmbdc::ExitCode(0); // all replies received, now
189  * // throw to unblock the requestReply call
190  * }
191  * @endcode
192  *
193  * @param request the REQUEST sent out
194  * @param ctx the replies are expected to show up in this Context which contains a pool
195  * @param replyHandler the callback logic to handle replies
196  */
197  template <RequestC RequestT, typename ReplyContext, typename ReplyHandler>
198  bool
199  requestReply(RequestT&& request
200  , ReplyContext& ctx
201  , ReplyHandler& replyHandler
202  , time::Duration timeout = time::Duration::seconds(0xffffffff)) {
203  static_assert(ReplyContext::cpa::has_pool && !ReplyContext::cpa::pool_msgless
204  , "not supported - needs to have a pool");
205  return hmbdc::app::requestReply(std::forward<RequestT>(request)
206  , *static_cast<CcSender*>(this)
207  , ctx
208  , replyHandler
209  , timeout);
210  }
211 
212 
213  /**
214  * @brief part of synchronous request reply interface. Send a constructed REPLY
215  * using the CcSender, it will only reach to the origianl sender Client.
216  * ALL REPLY via a Sender need to sent out using this interface to avoid
217  * undefined behavior.
218  *
219  * @tparam MessageC Message the type wrapped in REPLY<> template
220  * example usage:
221  * @code
222  * REPLY<Response> resp(req, req.request); //see REPLY<> ctor
223  * myCtx.sendReply(resp);
224  * @endcode
225  *
226  * @param reply (rvalue) the REPLY (previously constructed from a REQUEST) to sent out
227  */
228  template <MessageC M>
229  void
230  sendReply(REPLY<M> && reply) {
231  (static_cast<CcSender*>(this))->send(std::forward<REPLY<M>>(reply));
232  }
233 
234  /**
235  * @brief part of synchronous request reply interface. Send a constructed REPLY
236  * using the CcSender, it will only reach to the origianl sender Client.
237  * ALL REPLY via a Sender need to sent out using this interface to avoid
238  * undefined behavior.
239  *
240  * @tparam MessageC Message the type wrapped in REPLY<> template
241  * example usage:
242  * @code
243  * REPLY<Response> resp(req, req.request); //see REPLY<> ctor
244  * myCtx.sendReply(resp);
245  * @endcode
246  *
247  * @param reply (lvalue) the REPLY (previously constructed from a REQUEST) to sent out
248  */
249  template <MessageC M>
250  void
251  sendReply(REPLY<M> const& reply) {
252  static_cast<CcSender*>(this)->send(reply);
253  }
254 };
255 
256 }}
257 
258 
bool requestReply(RequestT &&request, Sender &sender, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout)
not an interface exposed to end user
Definition: RequestReply.hpp:140
Definition: TypedString.hpp:76
template that convert a regular message to be a reply used in request / reply sync messaging - see Re...
Definition: Message.hpp:363
void sendReply(REPLY< M > &&reply)
part of synchronous request reply interface. Send a constructed REPLY using the CcSender, it will only reach to the origianl sender Client. ALL REPLY via a Sender need to sent out using this interface to avoid undefined behavior.
Definition: RequestReply.hpp:230
synchronous request reply interface for the Network Sender
Definition: RequestReply.hpp:166
void sendReply(REPLY< M > const &reply)
part of synchronous request reply interface. Send a constructed REPLY using the CcSender, it will only reach to the origianl sender Client. ALL REPLY via a Sender need to sent out using this interface to avoid undefined behavior.
Definition: RequestReply.hpp:251
a std tuple holding REPLY messages types it can dispatch
Definition: Message.hpp:78
A special type of message only used on the receiving side.
Definition: Message.hpp:182
bool requestReply(RequestT &&request, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout=time::Duration::seconds(0xffffffff))
The caller is blocked until the replies to this request are heard and processed. Just like regular Me...
Definition: RequestReply.hpp:199
Definition: Uuid.hpp:10
Definition: Message.hpp:112
Definition: Time.hpp:133
Definition: Message.hpp:377
Exception that just has an exit code.
Definition: Exception.hpp:28
a std tuple holding messages types it can dispatch except REPLYs
Definition: BlockingContext.hpp:183
Definition: Base.hpp:13
a std tuple holding REQUEST messages types it can dispatch