hmbdc
simplify-high-performance-messaging-programming
 All Classes Namespaces Functions Variables Friends Pages
BlockingContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Message.hpp"
4 #include "hmbdc/app/RequestReply.hpp"
5 #include "hmbdc/app/Logger.hpp"
6 #include "hmbdc/app/Context.hpp"
7 #include "hmbdc/time/Timers.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/pattern/BlockingBuffer.hpp"
10 #include "hmbdc/os/Thread.hpp"
11 #include "hmbdc/MetaUtils.hpp"
12 #include "hmbdc/Exception.hpp"
13 
14 #include <type_traits>
15 #include <tuple>
16 #include <vector>
17 #include <thread>
18 #include <mutex>
19 #include <stdexcept>
20 
21 namespace hmbdc { namespace app {
22 namespace blocking_context_detail {
23 
24 template <bool is_timer_manager>
25 struct tm_runner {
26  template<typename C>
27  void operator()(C&) {}
28 };
29 
30 template <>
31 struct tm_runner<true> {
32  void operator()(time::TimerManager& tm) {
33  tm.checkTimers(time::SysTime::now());
34  }
35 };
36 
37 template <typename... MessageTuples>
39  using Interests = std::tuple<>;
40  using Requests = std::tuple<>;
41  using Replies = std::tuple<>;
42 };
43 
44 template <typename MessageTuple, typename... MessageTuples>
45 struct context_property_aggregator<MessageTuple, MessageTuples ...> {
46  using Interests = typename hmbdc::merge_tuple_unique<
47  typename templatized_subtractor<REPLY, MessageTuple>::type
48  , typename context_property_aggregator<MessageTuples ...>::Interests
49  >::type;
50 
51  using Requests = typename hmbdc::merge_tuple_unique<
52  typename templatized_aggregator<REQUEST, MessageTuple>::type
53  , typename context_property_aggregator<MessageTuples ...>::Requests
54  >::type;
55 
56  using Replies = typename merge_tuple_unique<
57  typename templatized_aggregator<REPLY, MessageTuple>::type
58  , typename context_property_aggregator<MessageTuples ...>::Replies
59  >::type;
60 };
61 
62 
63 template <typename T>
64 typename std::enable_if<std::is_base_of<time::TimerManager, T>::value, time::Duration>::type
65 waitDuration(T const& tm, time::Duration maxBlockingTime) {
66  return std::min(tm.untilNextFire(), maxBlockingTime);
67 }
68 
69 template <typename T>
70 typename std::enable_if<!std::is_base_of<time::TimerManager, T>::value, time::Duration>::type
71 waitDuration(T const&, time::Duration maxBlockingTime) {
72  return maxBlockingTime;
73 }
74 
75 template <typename CcClient>
76 bool runOnceImpl(bool& HMBDC_RESTRICT stopped
77  , pattern::BlockingBuffer* HMBDC_RESTRICT buf
78  , CcClient& HMBDC_RESTRICT c, time::Duration maxBlockingTime) {
80  try {
81  tm_runner<std::is_base_of<time::TimerManager, CcClient>::value> tr;
82  tr(c);
83 
84  const bool clientParticipateInMessaging =
85  std::decay<CcClient>::type::INTERESTS_SIZE != 0;
86  if (clientParticipateInMessaging && buf) {
87  uint64_t count = buf->peek(begin, end);
88  auto b = begin;
89  c.CcClient::handleRangeImpl(b, end, 0xffff);
90  c.CcClient::invokedCb(0xffff);
91  buf->wasteAfterPeek(count);
92  if (!count) {
93  buf->waitItem(waitDuration(c, maxBlockingTime));
94  }
95  } else {
96  c.CcClient::invokedCb(0xffffu);
97  auto d = waitDuration(c, maxBlockingTime);
98  if (d) usleep(d.microseconds());
99  }
100  } catch (std::exception const& e) {
101  if (!stopped) {
102  c.stopped(e);
103  return !c.dropped();
104  }
105  } catch (int code) {
106  if (!stopped) {
107  c.stopped(ExitCode(code));
108  return !c.dropped();
109  }
110  } catch (...) {
111  if (!stopped) {
112  c.stopped(UnknownException());
113  return !c.dropped();
114  }
115  }
116  return true;
117 }
118 
119 
120 template <typename CcClient>
121 bool runOnceLoadSharingImpl(bool& HMBDC_RESTRICT stopped
122  , pattern::BlockingBuffer* HMBDC_RESTRICT buf
123  , CcClient& HMBDC_RESTRICT c, time::Duration maxBlockingTime) {
124  pattern::BlockingBuffer::iterator begin, end;
125  try {
126  tm_runner<std::is_base_of<time::TimerManager, CcClient>::value> tr;
127  tr(c);
128 
129  const bool clientParticipateInMessaging =
130  std::decay<CcClient>::type::INTERESTS_SIZE != 0;
131  if (clientParticipateInMessaging && buf) {
132  uint8_t msgWrapped[buf->maxItemSize()] __attribute__((aligned (8)));
133  void* tmp = msgWrapped;
134  if (buf->tryTake(tmp, 0, waitDuration(c, maxBlockingTime))) {
135  MessageDispacher<CcClient, typename CcClient::Interests>()(
136  c, *static_cast<MessageHead*>(tmp));
137  }
138  c.CcClient::invokedCb(0xffffu);
139  } else {
140  c.CcClient::invokedCb(0xffffu);
141  auto d = waitDuration(c, maxBlockingTime);
142  if (d) usleep(d.microseconds());
143  }
144  } catch (std::exception const& e) {
145  if (!stopped) {
146  c.stopped(e);
147  return !c.dropped();
148  }
149  } catch (int code) {
150  if (!stopped) {
151  c.stopped(ExitCode(code));
152  return !c.dropped();
153  }
154  } catch (...) {
155  if (!stopped) {
156  c.stopped(UnknownException());
157  return !c.dropped();
158  }
159  }
160  return true;
161 }
162 } //blocking_context_detail
163 
164 /**
165  * @class BlockingContext<>
166  * @brief A BlockingContext is like a media object that facilitates the communications
167  * for the Clients that it is holding. Each Client is powered by a single OS thread.
168  * a Client needs to be started once and only once to a single BlockingContext
169  * before any messages sending happens - typically in the initialization stage in main(),
170  * undefined behavior otherwise.
171  * @details a Client running in such a BlockingContext utilizing OS's blocking mechanism
172  * and takes less CPU time. The Client's responding time scales better when the number of
173  * Clients greatly exceeds the availlable CPUs in the system and the effective message rate
174  * for a Client tends to be low.
175  * @tparam MessageTuples std tuple capturing the Messages that the Context is supposed to
176  * deliver. Messages that not listed here are silently dropped to ensure loose coupling
177  * between senders and receivers
178  */
179 template <MessageTupleC... MessageTuples>
181 private:
183  template <typename Message> struct can_handle;
184 public:
185  /**
186  * @class Interests
187  * @brief a std tuple holding messages types it can dispatch except REPLYs
188  * @details will not compile if using the Context to deliver a message not in this tuple
189  *
190  */
191  using Interests = typename cpa::Interests;
192 
193  /**
194  * @class Requests
195  * @brief a std tuple holding REQUEST messages types it can dispatch
196  * @details will not compile if using the Context to deliver a REQUEST not in this tuple
197  *
198  */
199  using Requests = typename cpa::Requests;
200 
201  /**
202  * @class Replies
203  * @brief a std tuple holding REPLY messages types it can dispatch
204  * @details will not compile if using the Context to deliver a REPLY not in this tuple
205  *
206  */
207  using Replies = typename cpa::Replies;
208 
209 
210  struct deliverAll {
211  template <typename T>
212  bool operator()(T&&) {return true;}
213  };
214 
215  /**
216  * @brief trivial ctor
217  */
218  template <typename U = Requests>
219  BlockingContext(typename std::enable_if<
220  std::tuple_size<U>::value == 0, void>::type* = nullptr)
221  : stopped_(false) {}
222 
223  template <typename U = Requests>
224  BlockingContext(typename std::enable_if<
225  std::tuple_size<U>::value, void>::type* = nullptr)
226  : stopped_(false) {
227  rrCtx_.start(1, 0);
228  }
229 
230 
231  BlockingContext(BlockingContext const&) = delete;
232  BlockingContext& operator = (BlockingContext const&) = delete;
233 
234  /**
235  * @brief start a client within the Context. The client is powered by a single OS thread.
236  * @details it is ok if a Client is blocking, if its own buffer is not full, it
237  * doesn't affect other Client's capabilities of receiving Messages
238  *
239  * @tparam Client actual Client type
240  * @tparam DeliverPred a condition functor deciding if a message should be delivered to a
241  * Client, which provides filtering before the Message type filtering. It improves performance
242  * in the way it could potentially reduce the unblocking times of thread
243  *
244  * Usage example:
245  * //the following Pred verifies srcId matches for Response, and let all other Messages types
246  * //deliver
247  * @code
248  * struct Pred {
249  * Pred(uint16_t srcId)
250  * : srcId(srcId){}
251  *
252  * bool operator()(Response const& resp) {
253  * return resp.srcId == srcId;
254  * }
255  * template <typename M>
256  * bool operator()(M const&) {return true;}
257  *
258  * uint16_t srcId;
259  * };
260  *
261  * @endcode
262  *
263  * @param c Client
264  * @param capacity the maximum messages this Client can buffer
265  * @param maxItemSize the max size of a message - when the Client is interetsed
266  * in JustBytes, this value MUST be big enough to hold the max size message to
267  * avoid truncation
268  * @param cpuAffinity cpu affinity mask for this Client's thread
269  * @param maxBlockingTime it is recommended to limit the duration a Client blocks due to
270  * no messages to handle, so it can respond to things like Context is stopped, or generate
271  * heartbeats if applicable.
272  * @param pred see DeliverPred documentation
273  */
274  template <typename Client, typename DeliverPred = deliverAll>
275  void start(Client& c
276  , size_t capacity = 1024
278  , uint64_t cpuAffinity = 0
279  , time::Duration maxBlockingTime = time::Duration::seconds(1)
280  , DeliverPred&& pred = DeliverPred()) {
282  HMBDC_THROW(std::out_of_range, "maxItemSize is too small");
283  std::shared_ptr<Transport> t(new Transport(maxItemSize, capacity));
285  msgConduits_, t, std::forward<DeliverPred>(pred));
286  auto thrd = kickOffClientThread(blocking_context_detail::runOnceImpl<Client>
287  , c, t.use_count() == 1?nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
288  threads_.push_back(move(thrd));
289  }
290 
291  /**
292  * @brief start a group of clients within the Context that collectively processing
293  * messages in a load sharing manner. Each client is powered by a single OS thread
294  * @details it is ok if a Client is blocking, if its own buffer is not full, it
295  * doesn't affect other Client's capabilities of receiving Messages
296  *
297  * @tparam LoadSharingClientPtrIt iterator to a Client pointer
298  * @tparam DeliverPred a condition functor deciding if a message should be delivered to these
299  * Clients, which provides filtering before the Message type filtering. It improves performance
300  * in the way it could potentially reduce the unblocking times of threads.
301  * An exception is for Client that are interested in JustBytes (basically every message),
302  * this filter does not apply - no filtering for these Clients
303  *
304  * @param begin an iterator, **begin should produce a Client& for the first Client
305  * @param end an end iterator, [begin, end) is the range for Clients
306  * @param capacity the maximum messages this Client can buffer
307  * @param maxItemSize the max size of a message - when the Client is interetsed
308  * in JustBytes, this value MUST be big enough to hold the max size message to
309  * avoid truncation
310  * @param cpuAffinity cpu affinity mask for this Client's thread
311  * @param maxBlockingTime it is recommended to limit the duration a Client blocks due to
312  * no messages to handle, so it can respond to things like Context is stopped, or generate
313  * heartbeats if applicable.
314  * @param pred see DeliverPred documentation
315  */
316  template <typename LoadSharingClientPtrIt, typename DeliverPred = deliverAll>
317  void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end
318  , size_t capacity = 1024
319  , size_t maxItemSize = max_size_in_tuple<
320  typename std::decay<decltype(**LoadSharingClientPtrIt())>::type::Interests
321  >::value
322  , uint64_t cpuAffinity = 0
323  , time::Duration maxBlockingTime = time::Duration::seconds(1)
324  , DeliverPred&& pred = DeliverPred()) { //=[](auto&&){return true;}
325  using Client = typename std::decay<
326  decltype(**LoadSharingClientPtrIt())
327  >::type;
329  HMBDC_THROW(std::out_of_range, "maxItemSize is too small");
330  if (begin == end) return;
331  std::shared_ptr<Transport> t(new Transport(maxItemSize, capacity));
333  msgConduits_, t, std::forward<DeliverPred>(pred));
334  while(begin != end) {
335  auto thrd = kickOffClientThread(blocking_context_detail::runOnceLoadSharingImpl<Client>
336  , **begin++, t.use_count() == 1?nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
337  threads_.push_back(std::move(thrd));
338  }
339  }
340 
341  /**
342  * @brief stop the message dispatching - asynchronously
343  * @details asynchronously means not garanteed message dispatching
344  * stops immidiately after this non-blocking call
345  */
346  void stop() {
347  __atomic_thread_fence(__ATOMIC_ACQUIRE);
348  stopped_ = true;
349  }
350 
351  /**
352  * @brief wait until all threads of the Context exit
353  * @details blocking call
354  */
355  void join() {
356  for (auto& t : threads_) {
357  t.join();
358  }
359  threads_.clear();
360  }
361 
362  /**
363  * @brief send a message to the BlockingContext to dispatch
364  * @details only the Clients that handles the Message will get it
365  * This function is threadsafe, which means you can call it anywhere in the code
366  *
367  * @tparam Message type
368  * @param m message
369  */
370  template <MessageC Message>
371  typename std::enable_if<can_handle<Message>::match, void>::type
372  send(Message&& m) {
373  justBytesSend(m);
374  using M = typename std::decay<Message>::type;
375  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
376  static_assert(i < std::tuple_size<MsgConduits>::value, "unexpected message type");
377  auto& entries = std::get<i>(msgConduits_);
378  for (auto i = 0u; i < entries.size(); ++i) {
379  auto& e = entries[i];
380  if (e.diliverPred(m)) {
381  if (i == entries.size() - 1) {
382  e.transport->buffer
383  .template putInPlace<MessageWrap<M>>(std::forward<Message>(m));
384  } else {
385  e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
386  }
387  }
388  }
389  }
390  template <MessageC Message>
391  typename std::enable_if<!can_handle<Message>::match, void>::type
392  send(Message&& m){
393  justBytesSend(std::forward<Message>(m));
394  if (!can_handle<Message>::justbytes) {
395  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
396  , std::decay<Message>::type::typeTag);)
397  }
398  }
399 
400  /**
401  * @brief send a message by directly constructing the message in receiving message queue
402  * @details since the message only exists after being deleivered, the DeliverPred
403  * is not called to decide if it should be delivered
404  *
405  * @param args ctor args
406  * @tparam Message type
407  * @tparam typename ... Args args
408  */
409  template <MessageC Message, typename ... Args>
410  typename std::enable_if<can_handle<Message>::match, void>::type
411  sendInPlace(Args&&... args) {
412  justBytesSendInPlace<Message>(args...);
413  using M = typename std::decay<Message>::type;
414  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
415  auto& entries = std::get<i>(msgConduits_);
416  for (auto i = 0u; i < entries.size(); ++i) {
417  auto& e = entries[i];
418  if (i == entries.size() - 1) {
419  e.transport->buffer
420  .template putInPlace<MessageWrap<M>>(std::forward<Args>(args)...);
421  } else {
422  e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
423  }
424  }
425  }
426  template <MessageC Message, typename ... Args>
427  typename std::enable_if<!can_handle<Message>::match, void>::type
428  sendInPlace(Args&& ...args) {
429  justBytesSendInPlace<Message>(std::forward<Args>(args)...);
430  if (!can_handle<Message>::justbytes) {
431  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
432  , std::decay<Message>::type::typeTag);)
433  }
434  }
435 
436  /**
437  * @brief best effort to send a message to via the BlockingContext
438  * @details this method is not recommended if more than one recipients are accepting
439  * this message since there is no garantee each one will receive it once and only once.
440  * this call does not block - return false when deliver doesn't reach all
441  * intended recipients
442  * This method is threadsafe, which means you can call it anywhere in the code
443  *
444  * @param m message
445  * @param timeout return false if cannot deliver in the specified time
446  * @return true if send successfully to every intended receiver
447  */
448  template <MessageC Message>
449  typename std::enable_if<can_handle<Message>::match, bool>::type
450  trySend(Message&& m, time::Duration timeout = time::Duration::seconds(0)) {
451  if (!justBytesTrySend(m)) return false;
452  using M = typename std::decay<Message>::type;
453  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
454  auto& entries = std::get<i>(msgConduits_);
455  for (auto i = 0u; i < entries.size(); ++i) {
456  auto& e = entries[i];
457  if (e.diliverPred(m)) {
458  if (i == entries.size() - 1) {
459  if (!e.transport->buffer
460  .tryPut(MessageWrap<M>(std::forward<Message>(m)), timeout)) {
461  return false;
462  }
463  } else {
464  if (!e.transport->buffer.tryPut(MessageWrap<M>(m), timeout)) {
465  return false;
466  }
467  }
468  }
469  }
470  return true;
471  }
472 
473  template <MessageC Message>
474  typename std::enable_if<!can_handle<Message>::match, bool>::type
475  trySend(Message&& m, time::Duration = time::Duration::seconds(0)) {
476  if (!can_handle<Message>::justbytes) {
477  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
478  , std::decay<Message>::type::typeTag);)
479  }
480 
481  return justBytesTrySend(std::forward<Message>(m));
482  }
483 
484 
485  /**
486  * @brief best effort to send a message by directly constructing the message in
487  * receiving message queue
488  * @details since the message only exists after being deleivered, the DeliverPred
489  * is not called to decide if it should be delivered
490  * this method is not recommended if more than one recipients are accepting
491  * this message since it is hard to ensure each message is delivered once and only once.
492  * this call does not block - return false when delivery doesn't reach ALL
493  * intended recipients
494  * This method is threadsafe, which means you can call it anywhere in the code
495  *
496  * @return true if send successfully to every intended receiver
497  */
498  template <MessageC Message, typename ... Args>
499  typename std::enable_if<can_handle<Message>::match, bool>::type
500  trySendInPlace(Args&&... args) {
501  if (!justBytesTrySendInPlace<Message>(args...)) return false;
502  using M = typename std::decay<Message>::type;
503  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
504  auto& entries = std::get<i>(msgConduits_);
505  for (auto i = 0u; i < entries.size(); ++i) {
506  auto& e = entries[i];
507  if (i == entries.size() - 1) {
508  if (!e.transport->buffer.template
509  tryPutInPlace<MessageWrap<M>>(std::forward<Args>(args)...)) {
510  return false;
511  }
512  } else {
513  if (!e.transport->buffer.template
514  tryPutInPlace<MessageWrap<M>>(args...)) {
515  return false;
516  }
517  }
518  }
519  return true;
520  }
521 
522  template <MessageC Message, typename ... Args>
523  typename std::enable_if<!can_handle<Message>::match, bool>::type
524  trySendInPlace(Args&&... args) {
525  if (!can_handle<Message>::justbytes) {
526  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
527  , std::decay<Message>::type::typeTag);)
528  }
529 
530  return justBytesTrySendInPlace<Message>(args...);
531  }
532 
533  /**
534  * @brief send a range of messages via the BlockingContext
535  * - only checking with diliverPred using the first message
536  * @details only the Clients that handles the Message will get it of course
537  * This function is threadsafe, which means you can call it anywhere in the code
538  *
539  * @param begin a forward iterator point at the start of the range
540  * @param n length of the range
541  */
542  template <MessageForwardIterC ForwardIt>
543  typename std::enable_if<can_handle<decltype(*(ForwardIt()))>::match, void>::type
544  send(ForwardIt begin, size_t n) {
545  if (!n) return;
546  justBytesSend(begin, n);
547  using Message = decltype(*begin);
548  using M = typename std::decay<Message>::type;
549  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
550  auto& entries = std::get<i>(msgConduits_);
551  for (auto i = 0u; i < entries.size(); ++i) {
552  auto& e = entries[i];
553  if (e.diliverPred(*begin)) {
554  while (!e.transport->buffer.template
555  tryPutBatchInPlace<MessageWrap<M>>(begin, n)) {}
556  }
557  }
558  }
559 
560  template <MessageForwardIterC ForwardIt>
561  typename std::enable_if<!can_handle<decltype(*(ForwardIt()))>::match, void>::type
562  send(ForwardIt begin, size_t n) {
563  if (!can_handle<decltype(*(ForwardIt()))>::justbytes) {
564  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
565  , std::decay<decltype(*(ForwardIt()))>::type::typeTag);)
566  }
567  justBytesSend(begin, n);
568  }
569 
570  /**
571  * @brief part of synchronous request reply interface. The caller is blocked until
572  * the replies to this request are heard and processed. Just liek regular Messages
573  * REQUEST is sent out to Clients that are interested in this Context
574  *
575  * @details
576  *
577  * @tparam MessageC Message the type wrapped in REQUEST<> template
578  * @tparam typename ReplyHandler type for how to handle the replies.
579  * example of a handler expect two steps replies for requestReply call:
580  * @code
581  * struct ClientAlsoHandlesReplies
582  * : Client<ClientAlsoHandlesReplies, REPLY<Reply0>, REPLY<Reply1>, OtherMsg> {
583  * void handleReplyCb(Reply0 const&){
584  * // got step 1 reply
585  * }
586  * void handleReplyCb(Reply1 const&){
587  * // got step 2 reply
588  * throw hmbdc::ExitCode(0); // all replies received, now
589  * // throw to unblock the requestReply call
590  * }
591  * @endcode
592  *
593  * @param request the REQUEST sent out
594  * @param replyHandler the callback logic to handle replies
595  * @param timeout timeout value
596  * @return true if replies are received and handled, false if timedout
597  */
598  template <MessageC M, typename ReplyHandler>
599  bool
600  requestReply(REQUEST<M>&& request, ReplyHandler& replyHandler
601  , time::Duration timeout = time::Duration::seconds(0xffffffff)) {
602  static_assert(std::tuple_size<Requests>::value != 0
603  , "no REQUEST is declared for this Context");
604  return hmbdc::app::requestReply(std::forward<REQUEST<M>>(request)
605  , *this
606  , rrCtx_
607  , replyHandler
608  , timeout);
609  }
610 
611  /**
612  * @brief part of synchronous request reply interface. Send a constructed REPLY
613  * to the Context, it will only reach to the origianl sender Client.
614  * ALL REPLY need to sent out using this interface to avoid
615  * undefined behavior.
616  *
617  * @tparam MessageC Message the type wrapped in REPLY<> template
618  * @param reply the rvalue REPLY (previously constructed from a REQUEST) to sent out
619  */
620  template <ReplyC ReplyT>
621  void
622  sendReply(ReplyT &&reply) {
623  using R = typename std::decay<ReplyT>::type;
624  static_assert(std::is_trivially_destructible<R>::value, "unsupported");
625  static_assert(std::tuple_size<Requests>::value != 0
626  , "no REQUEST is declared for this Context, no one would be waiting");
627  rrCtx_.send(std::forward<ReplyT>(reply));
628  }
629 
630 private:
631  void start(){}
632 
633  struct Transport {
634  Transport(size_t maxItemSize, size_t capacity)
635  : buffer(maxItemSize + sizeof(MessageHead), capacity){}
637  };
638 
639  template <typename Message>
640  struct TransportEntry {
641  std::shared_ptr<Transport> transport;
642  std::function<bool (Message const&)> diliverPred;
643  };
644 
645  template <typename Message>
646  using TransportEntries = std::vector<TransportEntry<Message>>;
647 
648  template <typename Tuple> struct MCGen;
649  template <typename ...Messages>
650  struct MCGen<std::tuple<Messages ...>> {
651  using type = std::tuple<TransportEntries<Messages> ...>;
652  };
653 
654  using MsgConduits = typename MCGen<Interests>::type;
655 
656  MsgConduits msgConduits_;
657  using Threads = std::vector<std::thread>;
658  Threads threads_;
659  bool stopped_;
660  typename std::conditional<std::tuple_size<Requests>::value
663  >
664  , void*
665  >::type rrCtx_;
666 
667  template <typename MsgConduits, typename DeliverPred, typename Tuple>
668  struct setupConduit {
669  void operator()(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&){}
670  };
671 
672  struct createEntry {
673  template <bool careMsg, size_t i, typename M, typename MsgConduits, typename DeliverPred>
674  typename std::enable_if<careMsg, void>::type
675  doOrNot(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred) {
676  auto& entries = std::get<i>(msgConduits);
677  entries.emplace_back(TransportEntry<M>{t, pred});
678  }
679 
680  template <bool careMsg, size_t i, typename M, typename MsgConduits, typename DeliverPred>
681  typename std::enable_if<!careMsg, void>::type
682  doOrNot(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&) {
683  }
684  };
685 
686  template <typename MsgConduits, typename DeliverPred, typename M, typename ...Messages>
687  struct setupConduit<MsgConduits, DeliverPred, std::tuple<M, Messages...>> {
688  void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred){
689  auto constexpr i = index_in_tuple<M, Interests>::value;
690  createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
691  msgConduits, t, pred);
692  setupConduit<MsgConduits, DeliverPred, std::tuple<Messages...>>()
693  (msgConduits,t, std::forward<DeliverPred>(pred));
694  }
695  };
696 
697  template <typename MsgConduits, typename M, typename ...Messages>
698  struct setupConduit<MsgConduits, void*, std::tuple<M, Messages...>> {
699  void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t, void*){
700  auto constexpr i = index_in_tuple<M, Interests>::value;
701  createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
702  msgConduits, t, [](M const&){return true;});
703  setupConduit<MsgConduits, void*, std::tuple<Messages...>>()
704  (msgConduits, t, nullptr);
705  }
706  };
707 
708  template <typename Message>
709  struct can_handle {
710  private:
711  using M = typename std::decay<Message>::type;
712  public:
713  enum {
714  index = index_in_tuple<TransportEntries<M>, MsgConduits>::value,
715  match = index < std::tuple_size<MsgConduits>::value,
716  justbytes_index = index_in_tuple<TransportEntries<JustBytes>, MsgConduits>::value,
717  justbytes = justbytes_index < std::tuple_size<MsgConduits>::value
718  && std::is_trivially_destructible<M>::value? 1 :0,
719  };
720  };
721 
722  template <typename Client, typename RunOnceFunc>
723  auto kickOffClientThread(RunOnceFunc runOnceFunc, Client& c, pattern::BlockingBuffer* buffer
724  , uint64_t mask, time::Duration maxBlockingTime) {
725  if (buffer == nullptr) {
726  HMBDC_LOG_ONCE(HMBDC_LOG_D("starting a Client that cannot receive messages, Client typid()="
727  , typeid(Client).name());)
728  }
729  std::thread thrd([
730  this
731  , &c
732  , buffer
733  , mask
734  , maxBlockingTime
735  , runOnceFunc]() {
736  std::string name;
737  char const* schedule;
738  int priority;
739 
740  if (c.hmbdcName()) {
741  name = c.hmbdcName();
742  } else {
743  name = "hmbdc-b";
744  }
745  auto cpuAffinityMask = mask;
746  std::tie(schedule, priority) = c.schedSpec();
747 
748  if (!schedule) schedule = "SCHED_OTHER";
749 
750  os::configureCurrentThread(name.c_str(), cpuAffinityMask
751  , schedule, priority);
752  c.messageDispatchingStartedCb(0xffff);
753 
754  while(!stopped_ && runOnceFunc(this->stopped_, buffer, c, maxBlockingTime)) {
755  }
756  if (this->stopped_) c.dropped();
757  }
758  );
759 
760  return thrd;
761  }
762 
763 
764  template <MessageC Message>
765  typename std::enable_if<can_handle<Message>::justbytes, void>::type
766  justBytesSend(Message&& m) {
767  using M = typename std::decay<Message>::type;
768  auto constexpr i = can_handle<Message>::justbytes_index;
769  auto& entries = std::get<i>(msgConduits_);
770  for (auto i = 0u; i < entries.size(); ++i) {
771  auto& e = entries[i];
772  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
773  e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
774  } else {
775  HMBDC_THROW(std::out_of_range
776  , "need to increase maxItemSize in start() to >= "
777  << sizeof(MessageWrap<M>));
778  }
779  }
780  }
781 
782  template <MessageC Message>
783  typename std::enable_if<!can_handle<Message>::justbytes, void>::type
784  justBytesSend(Message&& m) {
785  }
786 
787  template <MessageC Message, typename ... Args>
788  typename std::enable_if<can_handle<Message>::justbytes, void>::type
789  justBytesSendInPlace(Args&& ...args) {
790  using M = typename std::decay<Message>::type;
791  auto constexpr i = can_handle<Message>::justbytes_index;
792  auto& entries = std::get<i>(msgConduits_);
793  for (auto i = 0u; i < entries.size(); ++i) {
794  auto& e = entries[i];
795  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
796  e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
797  } else {
798  HMBDC_THROW(std::out_of_range
799  , "need to increase maxItemSize in start() to >= "
800  << sizeof(MessageWrap<M>));
801  }
802  }
803  }
804 
805  template <MessageC Message, typename ... Args>
806  typename std::enable_if<!can_handle<Message>::justbytes, void>::type
807  justBytesSendInPlace(Args&&... args) {
808  }
809 
810  template <MessageC Message>
811  typename std::enable_if<can_handle<Message>::justbytes, bool>::type
812  justBytesTrySend(Message&& m, time::Duration timeout) {
813  using M = typename std::decay<Message>::type;
814  auto constexpr i = can_handle<Message>::justbytes_index;
815  auto& entries = std::get<i>(msgConduits_);
816  for (auto i = 0u; i < entries.size(); ++i) {
817  auto& e = entries[i];
818  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
819  if (!e.transport->buffer.tryPut(MessageWrap<M>(m), timeout)) {
820  return false;
821  }
822  } else {
823  HMBDC_THROW(std::out_of_range
824  , "need to increase maxItemSize in start() to >= "
825  << sizeof(MessageWrap<M>));
826  }
827  }
828  return true;
829  }
830  template <MessageC Message>
831  typename std::enable_if<!can_handle<Message>::justbytes, bool>::type
832  justBytesTrySend(Message&& m, time::Duration timeout) {
833  return true;
834  }
835 
836  template <MessageC Message, typename ... Args>
837  typename std::enable_if<can_handle<Message>::justbytes, bool>::type
838  justBytesTrySendInPlace(Args&&... args) {
839  using M = typename std::decay<Message>::type;
840  auto constexpr i = can_handle<Message>::justbytes_index;
841  auto& entries = std::get<i>(msgConduits_);
842  for (auto i = 0u; i < entries.size(); ++i) {
843  auto& e = entries[i];
844  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
845  if (!e.transport->buffer.template
846  tryPutInPlace<MessageWrap<M>>(args...)) {
847  return false;
848  }
849  } else {
850  HMBDC_THROW(std::out_of_range
851  , "need to increase maxItemSize in start() to >= "
852  << sizeof(MessageWrap<M>));
853  }
854  }
855  return true;
856  }
857  template <MessageC Message, typename ... Args>
858  typename std::enable_if<!can_handle<Message>::justbytes, bool>::type
859  justBytesTrySendInPlace(Args&&... args) {
860  return true;
861  }
862 
863  template <MessageForwardIterC ForwardIt>
864  typename std::enable_if<can_handle<decltype(*(ForwardIt()))>::justbytes, void>::type
865  justBytesSend(ForwardIt begin, size_t n) {
866  using Message = decltype(*begin);
867  using M = typename std::decay<Message>::type;
868  auto constexpr i = can_handle<decltype(*(ForwardIt()))>::justbytes_index;
869  auto& entries = std::get<i>(msgConduits_);
870  for (auto i = 0u; i < entries.size(); ++i) {
871  auto& e = entries[i];
872 
873  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
874  while (!e.transport->buffer.template tryPutBatchInPlace<MessageWrap<M>>(begin, n)) {}
875  } else {
876  HMBDC_THROW(std::out_of_range
877  , "need to increase maxItemSize in start() to >= "
878  << sizeof(MessageWrap<M>));
879  }
880  }
881  }
882 
883  template <MessageForwardIterC ForwardIt>
884  typename std::enable_if<!can_handle<decltype(*(ForwardIt()))>::justbytes, void>::type
885  justBytesSend(ForwardIt begin, size_t n) {
886  }
887 
888 };
889 }}
Definition: BlockingBuffer.hpp:16
Definition: MetaUtils.hpp:80
bool requestReply(RequestT &&request, Sender &sender, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout)
not an interface exposed to end user
Definition: RequestReply.hpp:140
std::enable_if< can_handle< decltype(*(ForwardIt()))>::match, void >::type send(ForwardIt begin, size_t n)
send a range of messages via the BlockingContext
Definition: BlockingContext.hpp:544
Definition: BlockingContext.hpp:633
Definition: Timers.hpp:66
Definition: BlockingContext.hpp:648
Definition: MetaUtils.hpp:44
std::enable_if< can_handle< Message >::match, void >::type send(Message &&m)
send a message to the BlockingContext to dispatch
Definition: BlockingContext.hpp:372
Definition: BlockingContext.hpp:640
Definition: BlockingContext.hpp:183
Definition: BlockingContext.hpp:210
a std tuple holding REPLY messages types it can dispatch
BlockingContext(typename std::enable_if< std::tuple_size< U >::value==0, void >::type *=nullptr)
trivial ctor
Definition: BlockingContext.hpp:219
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:180
Definition: Message.hpp:78
Definition: BlockingContext.hpp:672
bool requestReply(REQUEST< M > &&request, ReplyHandler &replyHandler, time::Duration timeout=time::Duration::seconds(0xffffffff))
part of synchronous request reply interface. The caller is blocked until the replies to this request ...
Definition: BlockingContext.hpp:600
std::enable_if< can_handle< Message >::match, void >::type sendInPlace(Args &&...args)
send a message by directly constructing the message in receiving message queue
Definition: BlockingContext.hpp:411
void start(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a client within the Context. The client is powered by a single OS thread.
Definition: BlockingContext.hpp:275
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:57
std::enable_if< can_handle< Message >::match, bool >::type trySendInPlace(Args &&...args)
best effort to send a message by directly constructing the message in receiving message queue ...
Definition: BlockingContext.hpp:500
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:477
void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename std::decay< decltype(**LoadSharingClientPtrIt())>::type::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a group of clients within the Context that collectively processing messages in a load sharing m...
Definition: BlockingContext.hpp:317
void join()
wait until all threads of the Context exit
Definition: BlockingContext.hpp:355
void stop()
stop the message dispatching - asynchronously
Definition: BlockingContext.hpp:346
Definition: Message.hpp:112
Definition: Time.hpp:133
Definition: BlockingContext.hpp:668
Definition: MetaUtils.hpp:96
std::enable_if< can_handle< Message >::match, bool >::type trySend(Message &&m, time::Duration timeout=time::Duration::seconds(0))
best effort to send a message to via the BlockingContext
Definition: BlockingContext.hpp:450
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Definition: BlockingBuffer.hpp:253
a std tuple holding messages types it can dispatch except REPLYs
Definition: BlockingContext.hpp:183
Definition: BlockingContext.hpp:25
a std tuple holding REQUEST messages types it can dispatch
template that convert a regular message to be a request used in request / reply sync messaging - see ...
Definition: Message.hpp:339
void sendReply(ReplyT &&reply)
part of synchronous request reply interface. Send a constructed REPLY to the Context, it will only reach to the origianl sender Client. ALL REPLY need to sent out using this interface to avoid undefined behavior.
Definition: BlockingContext.hpp:622