hmbdc
simplify-high-performance-messaging-programming
BlockingContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Message.hpp"
4 #include "hmbdc/app/Logger.hpp"
5 #include "hmbdc/app/Context.hpp"
6 #include "hmbdc/time/Timers.hpp"
7 #include "hmbdc/time/Time.hpp"
8 #include "hmbdc/pattern/BlockingBuffer.hpp"
9 #include "hmbdc/os/Thread.hpp"
10 #include "hmbdc/MetaUtils.hpp"
11 #include "hmbdc/Exception.hpp"
12 
13 #include <type_traits>
14 #include <tuple>
15 #include <vector>
16 #include <thread>
17 #include <mutex>
18 #include <stdexcept>
19 
20 namespace hmbdc { namespace app {
21 namespace blocking_context_detail {
22 HMBDC_CLASS_HAS_DECLARE(hmbdc_ctx_queued_ts);
23 
24 template <typename Interests>
25 struct RealInterests {
26  private:
27  using no_trivial = typename filter_out_tuple<std::is_trivially_destructible
28  , Interests
29  >::type;
30  public:
31  /// avoid double delivery when JustBytes is involved
32  using type = typename std::conditional<
33  index_in_tuple<JustBytes, Interests>::value == std::tuple_size<Interests>::value
34  , Interests
37 };
38 
39 template <bool is_timer_manager>
40 struct tm_runner {
41  template<typename C>
42  void operator()(C&) {}
43 };
44 
45 template <>
46 struct tm_runner<true> {
47  void operator()(time::TimerManager& tm) {
48  tm.checkTimers(time::SysTime::now());
49  }
50 };
51 
52 template <typename... MessageTuples>
54  using Interests = std::tuple<>;
55 };
56 
57 template <typename MessageTuple, typename... MessageTuples>
58 struct context_property_aggregator<MessageTuple, MessageTuples ...> {
59  using Interests = typename hmbdc::merge_tuple_unique<
60  MessageTuple, typename context_property_aggregator<MessageTuples ...>::Interests
61  >::type;
62 };
63 
64 
65 template <typename T>
66 typename std::enable_if<std::is_base_of<time::TimerManager, T>::value, time::Duration>::type
67 waitDuration(T const& tm, time::Duration maxBlockingTime) {
68  return std::min(tm.untilNextFire(), maxBlockingTime);
69 }
70 
71 template <typename T>
72 typename std::enable_if<!std::is_base_of<time::TimerManager, T>::value, time::Duration>::type
73 waitDuration(T const&, time::Duration maxBlockingTime) {
74  return maxBlockingTime;
75 }
76 
77 template <typename CcClient>
78 bool runOnceImpl(bool& HMBDC_RESTRICT stopped
79  , pattern::BlockingBuffer* HMBDC_RESTRICT buf
80  , CcClient& HMBDC_RESTRICT c, time::Duration maxBlockingTime) {
82  try {
83  tm_runner<std::is_base_of<time::TimerManager, CcClient>::value> tr;
84  tr(c);
85 
86  const bool clientParticipateInMessaging =
87  std::decay<CcClient>::type::INTERESTS_SIZE != 0;
88  if (clientParticipateInMessaging && buf) {
89  uint64_t count = buf->peek(begin, end);
90  auto b = begin;
91  c.CcClient::invokedCb(c.CcClient::handleRangeImpl(b, end, 0xffff));
92  buf->wasteAfterPeek(count);
93  if (!count) {
94  buf->waitItem(waitDuration(c, maxBlockingTime));
95  }
96  } else {
97  c.CcClient::invokedCb(0);
98  auto d = waitDuration(c, maxBlockingTime);
99  if (d) usleep(d.microseconds());
100  }
101  } catch (std::exception const& e) {
102  if (!stopped) {
103  c.stopped(e);
104  return !c.dropped();
105  }
106  } catch (int code) {
107  if (!stopped) {
108  c.stopped(ExitCode(code));
109  return !c.dropped();
110  }
111  } catch (...) {
112  if (!stopped) {
113  c.stopped(UnknownException());
114  return !c.dropped();
115  }
116  }
117  return true;
118 }
119 
120 
121 template <typename CcClient>
122 bool runOnceLoadSharingImpl(bool& HMBDC_RESTRICT stopped
123  , pattern::BlockingBuffer* HMBDC_RESTRICT buf
124  , CcClient& HMBDC_RESTRICT c, time::Duration maxBlockingTime) {
125  pattern::BlockingBuffer::iterator begin, end;
126  try {
127  tm_runner<std::is_base_of<time::TimerManager, CcClient>::value> tr;
128  tr(c);
129 
130  const bool clientParticipateInMessaging =
131  std::decay<CcClient>::type::INTERESTS_SIZE != 0;
132  if (clientParticipateInMessaging && buf) {
133  uint8_t msgWrapped[buf->maxItemSize()] __attribute__((aligned (8)));
134  void* tmp = msgWrapped;
135  bool disp = false;
136  if (buf->tryTake(tmp, 0, waitDuration(c, maxBlockingTime))) {
137  disp = MessageDispacher<CcClient, typename CcClient::Interests>()(
138  c, *static_cast<MessageHead*>(tmp));
139  }
140  c.CcClient::invokedCb(disp?1:0);
141  } else {
142  c.CcClient::invokedCb(0);
143  auto d = waitDuration(c, maxBlockingTime);
144  if (d) usleep(d.microseconds());
145  }
146  } catch (std::exception const& e) {
147  if (!stopped) {
148  c.stopped(e);
149  return !c.dropped();
150  }
151  } catch (int code) {
152  if (!stopped) {
153  c.stopped(ExitCode(code));
154  return !c.dropped();
155  }
156  } catch (...) {
157  if (!stopped) {
158  c.stopped(UnknownException());
159  return !c.dropped();
160  }
161  }
162  return true;
163 }
164 
165 struct Transport {
166  Transport(size_t maxItemSize, size_t capacity)
167  : buffer(maxItemSize + sizeof(MessageHead), capacity){}
169 };
170 
171 template <typename Message>
173  std::shared_ptr<Transport> transport;
174  std::function<bool (Message const&)> deliverPred;
175  std::function<bool (uint16_t, uint8_t const*)> deliverJustBytesPred;
176 };
177 
178 template <>
180  std::shared_ptr<Transport> transport;
181  std::function<bool (uint16_t, uint8_t const*)> deliverPred;
182  std::function<bool (uint16_t, uint8_t const*)> deliverJustBytesPred;
183 };
184 
185 } //blocking_context_detail
186 
187 /**
188  * @class BlockingContext<>
189  * @brief A BlockingContext is like a media object that facilitates the communications
190  * for the Clients that it is holding. Each Client is powered by a single OS thread.
191  * a Client needs to be started once and only once to a single BlockingContext
192  * before any messages sending happens - typically in the initialization stage in main(),
193  * undefined behavior otherwise.
194  * @details a Client running in such a BlockingContext utilizing OS's blocking mechanism
195  * and takes less CPU time. The Client's responding time scales better when the number of
196  * Clients greatly exceeds the availlable CPUs in the system and the effective message rate
197  * for a Client tends to be low.
198  * @tparam MessageTuples std tuple capturing the Messages that the Context is supposed to
199  * deliver. Messages that not listed here are silently dropped to ensure loose coupling
200  * between senders and receivers
201  */
202 template <MessageTupleC... MessageTuples>
204 private:
206  template <typename Message> struct can_handle;
207 public:
208  /**
209  * @class Interests
210  * @brief a std tuple holding messages types it can dispatch
211  * @details will not compile if using the Context to deliver a message not in this tuple
212  *
213  */
214  using Interests = typename cpa::Interests;
215 
216  struct deliverAll {
217  template <typename T>
218  bool operator()(T&&) {return true;}
219  bool operator()(uint16_t, uint8_t const*) {return true;}
220  };
221 
222  /**
223  * @brief trivial ctor
224  */
226  : stopped_(false) {}
227 
228  BlockingContext(BlockingContext const&) = delete;
229  BlockingContext& operator = (BlockingContext const&) = delete;
230 
231  /**
232  * @brief start a client within the Context. The client is powered by a single OS thread.
233  * @details it is ok if a Client is blocking, if its own buffer is not full, it
234  * doesn't affect other Client's capabilities of receiving Messages
235  *
236  * @tparam Client actual Client type
237  * @tparam DeliverPred a condition functor deciding if a message should be delivered to a
238  * Client, which provides filtering before the Message type filtering. It improves performance
239  * in the way it could potentially reduce the unblocking times of thread
240  *
241  * Usage example:
242  * //the following Pred verifies srcId matches for Response, and let all other Messages types
243  * //deliver
244  * @code
245  * struct Pred {
246  * Pred(uint16_t srcId)
247  * : srcId(srcId){}
248  *
249  * bool operator()(Response const& resp) {
250  * return resp.srcId == srcId;
251  * }
252  * template <typename M>
253  * bool operator()(M const&) {return true;}
254  *
255  * uint16_t srcId;
256  * };
257  *
258  * @endcode
259  *
260  * @param c Client
261  * @param capacity the maximum messages this Client can buffer
262  * @param maxItemSize the max size of a message - when the Client is interetsed
263  * in JustBytes, this value MUST be big enough to hold the max size message to
264  * avoid truncation
265  * @param cpuAffinity cpu affinity mask for this Client's thread
266  * @param maxBlockingTime it is recommended to limit the duration a Client blocks due to
267  * no messages to handle, so it can respond to things like Context is stopped, or generate
268  * heartbeats if applicable.
269  * @param pred see DeliverPred documentation
270  */
271  template <typename Client, typename DeliverPred = deliverAll>
272  void start(Client& c
273  , size_t capacity = 1024
275  , uint64_t cpuAffinity = 0
276  , time::Duration maxBlockingTime = time::Duration::seconds(1)
277  , DeliverPred&& pred = DeliverPred()) {
278  auto t = registerToRun(c, capacity, maxItemSize, std::forward<DeliverPred>(pred));
279  auto thrd = kickOffClientThread(blocking_context_detail::runOnceImpl<Client>
280  , c, t.use_count() == 1?nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
281  threads_.push_back(move(thrd));
282  }
283 
284  private: using Transport = blocking_context_detail::Transport;
285 public:
286  using ClientRegisterHandle = std::shared_ptr<Transport>;
287  /**
288  * @brief same as start as to the paramater arguments, except it just register
289  * the Client with the context without actually starting the Client thread.
290  * The user is epxected to use runOnce() to manully power the thread
291  *
292  * @tparam Client see start() above
293  * @tparam DeliverPred see start() above
294  * @param c see start() above
295  * @param capacity see start() above
296  * @param maxItemSize see start() above
297  * @param pred see start() above
298  * @return a handle that is to be passed back in the runOnce
299  */
300  template <typename Client, typename DeliverPred = deliverAll>
301  ClientRegisterHandle registerToRun(Client& c
302  , size_t capacity = 1024
304  , DeliverPred&& pred = DeliverPred()) {
306  HMBDC_THROW(std::out_of_range, "maxItemSize is too small");
307  std::shared_ptr<Transport> t(new Transport(maxItemSize, capacity));
308  using RI = typename
311  msgConduits_, t, std::forward<DeliverPred>(pred));
312  return t;
313  }
314 
315  /**
316  * @brief run the Client previous registered to process exsiting Messages
317  *
318  * @tparam CcClient
319  * @param t the handle returned from registerToRun
320  * @param c the Client see start() above
321  * @param maxBlockingTime see start() above
322  * @return true when the Client did not terminate itself by throwing an exeption
323  * @return false otherwise
324  */
325  template <typename CcClient>
326  bool runOnce(ClientRegisterHandle& t, CcClient& c, time::Duration maxBlockingTime) {
327  bool stopped = false;
328  return runOnceImpl(stopped, &t->buffer, c, maxBlockingTime);
329  }
330 
331  /**
332  * @brief start a group of clients within the Context that collectively processing
333  * messages in a load sharing manner. Each client is powered by a single OS thread
334  * @details it is ok if a Client is blocking, if its own buffer is not full, it
335  * doesn't affect other Client's capabilities of receiving Messages
336  *
337  * @tparam LoadSharingClientPtrIt iterator to a Client pointer
338  * @tparam DeliverPred a condition functor deciding if a message should be delivered to these
339  * Clients, which provides filtering before the Message type filtering. It improves performance
340  * in the way it could potentially reduce the unblocking times of threads.
341  * An exception is for Client that are interested in JustBytes (basically every message),
342  * this filter does not apply - no filtering for these Clients
343  *
344  * @param begin an iterator, **begin should produce a Client& for the first Client
345  * @param end an end iterator, [begin, end) is the range for Clients
346  * @param capacity the maximum messages this Client can buffer
347  * @param maxItemSize the max size of a message - when the Client is interetsed
348  * in JustBytes, this value MUST be big enough to hold the max size message to
349  * avoid truncation
350  * @param cpuAffinity cpu affinity mask for this Client's thread
351  * @param maxBlockingTime it is recommended to limit the duration a Client blocks due to
352  * no messages to handle, so it can respond to things like Context is stopped, or generate
353  * heartbeats if applicable.
354  * @param pred see DeliverPred documentation
355  */
356  template <typename LoadSharingClientPtrIt, typename DeliverPred = deliverAll>
357  void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end
358  , size_t capacity = 1024
359  , size_t maxItemSize = max_size_in_tuple<
360  typename std::decay<decltype(**LoadSharingClientPtrIt())>::type::Interests
361  >::value
362  , uint64_t cpuAffinity = 0
363  , time::Duration maxBlockingTime = time::Duration::seconds(1)
364  , DeliverPred&& pred = DeliverPred()) { //=[](auto&&){return true;}
365  using Client = typename std::decay<
366  decltype(**LoadSharingClientPtrIt())
367  >::type;
369  HMBDC_THROW(std::out_of_range, "maxItemSize is too small");
370  if (begin == end) return;
371  std::shared_ptr<Transport> t(new Transport(maxItemSize, capacity));
372 
373  using RI = typename
376  msgConduits_, t, std::forward<DeliverPred>(pred));
377  while(begin != end) {
378  auto thrd = kickOffClientThread(blocking_context_detail::runOnceLoadSharingImpl<Client>
379  , **begin++, t.use_count() == 1?nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
380  threads_.push_back(std::move(thrd));
381  }
382  }
383 
384  /**
385  * @brief stop the message dispatching - asynchronously
386  * @details asynchronously means not garanteed message dispatching
387  * stops immidiately after this non-blocking call
388  */
389  void stop() {
390  __atomic_thread_fence(__ATOMIC_ACQUIRE);
391  stopped_ = true;
392  // std::apply(
393  // [](auto&&... args) {
394  // (std::for_each(args.begin(), args.end(), [](auto&& entry) {
395  // entry.transport->buffer.reset();
396  // }), ...);
397  // }
398  // , msgConduits_
399  // );
400  }
401 
402  /**
403  * @brief wait until all threads of the Context exit
404  * @details blocking call
405  */
406  void join() {
407  for (auto& t : threads_) {
408  t.join();
409  }
410  threads_.clear();
411  }
412 
413  /**
414  * @brief send a message to the BlockingContext to dispatch
415  * @details only the Clients that handles the Message will get it
416  * This function is threadsafe, which means you can call it anywhere in the code
417  *
418  * @tparam Message type
419  * @param m message
420  */
421  template <MessageC Message>
422  void send(Message&& m) {
423  if constexpr (can_handle<Message>::match) {
424  justBytesSend(m);
425  using M = typename std::decay<Message>::type;
426  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
427  static_assert(i < std::tuple_size<MsgConduits>::value, "unexpected message type");
428  auto& entries = std::get<i>(msgConduits_);
429  for (auto i = 0u; i < entries.size(); ++i) {
430  auto& e = entries[i];
431  if (e.deliverPred(m)) {
432  if constexpr (blocking_context_detail::has_hmbdc_ctx_queued_ts<M>::value) {
433  auto& tmp = const_cast<M&>(m);
434  tmp.hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
435  }
436 
437  if (i == entries.size() - 1) {
438  e.transport->buffer
439  .template putInPlace<MessageWrap<M>>(std::forward<Message>(m));
440  } else {
441  e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
442  }
443  }
444  }
445  } else {
446  justBytesSend(std::forward<Message>(m));
448  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
449  , m.getTypeTag());)
450  }
451  }
452  }
453 
454  /**
455  * @brief send a message by directly constructing the message in receiving message queue
456  * @details since the message only exists after being deleivered, the DeliverPred
457  * is not called to decide if it should be delivered
458  *
459  * @param args ctor args
460  * @tparam Message type
461  * @tparam typename ... Args args
462  */
463  template <MessageC Message, typename ... Args>
464  void sendInPlace(Args&&... args) {
465  static_assert(!Message::justBytes, "use sendJustBytesInPlace");
466  if constexpr (can_handle<Message>::match) {
467  justBytesSendInPlace<Message>(args...);
468  using M = typename std::decay<Message>::type;
469  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
470  auto& entries = std::get<i>(msgConduits_);
471  for (auto i = 0u; i < entries.size(); ++i) {
472  auto& e = entries[i];
473  if (i == entries.size() - 1) {
474  e.transport->buffer
475  .template putInPlace<MessageWrap<M>>(std::forward<Args>(args)...);
476  } else {
477  e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
478  }
479  }
480  } else {
481  justBytesSendInPlace<Message>(std::forward<Args>(args)...);
483  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
484  , std::decay<Message>::type::typeTag);)
485  }
486  }
487  }
488 
489  void sendJustBytesInPlace(uint16_t tag, void const* bytes, size_t len, hasMemoryAttachment* att) {
490  std::apply(
491  [=](auto&&... args) {
492  (args.applyJustBytes(tag, bytes, len, att), ...) ;
493  }
494  , msgConduits_
495  );
496  }
497 
498  /**
499  * @brief best effort to send a message to via the BlockingContext
500  * @details this method is not recommended if more than one recipients are accepting
501  * this message since there is no garantee each one will receive it once and only once.
502  * this call does not block - return false when deliver doesn't reach all
503  * intended recipients
504  * This method is threadsafe, which means you can call it anywhere in the code
505  *
506  * @param m message
507  * @param timeout return false if cannot deliver in the specified time
508  * @return true if send successfully to every intended receiver
509  */
510  template <MessageC Message>
511  bool trySend(Message&& m, time::Duration timeout = time::Duration::seconds(0)) {
512  if constexpr (can_handle<Message>::match) {
513  if (!justBytesTrySend(m)) return false;
514  using M = typename std::decay<Message>::type;
515  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
516  auto& entries = std::get<i>(msgConduits_);
517  for (auto i = 0u; i < entries.size(); ++i) {
518  auto& e = entries[i];
519  if (e.deliverPred(m)) {
520  if constexpr (blocking_context_detail::has_hmbdc_ctx_queued_ts<M>::value) {
521  auto& tmp = const_cast<M&>(m);
522  tmp.hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
523  }
524  if (i == entries.size() - 1) {
525  if (!e.transport->buffer
526  .tryPut(MessageWrap<M>(std::forward<Message>(m)), timeout)) {
527  return false;
528  }
529  } else {
530  if (!e.transport->buffer.tryPut(MessageWrap<M>(m), timeout)) {
531  return false;
532  }
533  }
534  }
535  }
536  return true;
537  } else {
539  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
540  , std::decay<Message>::type::typeTag);)
541  }
542 
543  return justBytesTrySend(std::forward<Message>(m));
544  }
545  }
546 
547  /**
548  * @brief best effort to send a message by directly constructing the message in
549  * receiving message queue
550  * @details since the message only exists after being deleivered, the DeliverPred
551  * is not called to decide if it should be delivered
552  * this method is not recommended if more than one recipients are accepting
553  * this message since it is hard to ensure each message is delivered once and only once.
554  * this call does not block - return false when delivery doesn't reach ALL
555  * intended recipients
556  * This method is threadsafe, which means you can call it anywhere in the code
557  *
558  * @return true if send successfully to every intended receiver
559  */
560  template <MessageC Message, typename ... Args>
561  bool trySendInPlace(Args&&... args) {
562  if constexpr (can_handle<Message>::match) {
563  if (!justBytesTrySendInPlace<Message>(args...)) return false;
564  using M = typename std::decay<Message>::type;
565  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
566  auto& entries = std::get<i>(msgConduits_);
567  for (auto i = 0u; i < entries.size(); ++i) {
568  auto& e = entries[i];
569  if (i == entries.size() - 1) {
570  if (!e.transport->buffer.template
571  tryPutInPlace<MessageWrap<M>>(std::forward<Args>(args)...)) {
572  return false;
573  }
574  } else {
575  if (!e.transport->buffer.template
576  tryPutInPlace<MessageWrap<M>>(args...)) {
577  return false;
578  }
579  }
580  }
581  return true;
582  } else {
584  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
585  , std::decay<Message>::type::typeTag);)
586  }
587 
588  return justBytesTrySendInPlace<Message>(args...);
589  }
590  }
591 
592  /**
593  * @brief send a range of messages via the BlockingContext
594  * - only checking with deliverPred using the first message
595  * @details only the Clients that handles the Message will get it of course
596  * This function is threadsafe, which means you can call it anywhere in the code
597  *
598  * @param begin a forward iterator point at the start of the range
599  * @param n length of the range
600  */
601  template <MessageForwardIterC ForwardIt>
602  void send(ForwardIt begin, size_t n) {
603  if constexpr (can_handle<decltype(*(ForwardIt()))>::match) {
604  if (!n) return;
605  justBytesSend(begin, n);
606  using Message = decltype(*begin);
607  using M = typename std::decay<Message>::type;
608  auto constexpr i = index_in_tuple<TransportEntries<M>, MsgConduits>::value;
609  auto& entries = std::get<i>(msgConduits_);
610  for (auto i = 0u; i < entries.size(); ++i) {
611  auto& e = entries[i];
612  if (e.deliverPred(*begin)) {
613  while (!e.transport->buffer.template
614  tryPutBatchInPlace<MessageWrap<M>>(begin, n)) {}
615  }
616  }
617  } else {
618  if (!can_handle<decltype(*(ForwardIt()))>::justbytes) {
619  HMBDC_LOG_ONCE(HMBDC_LOG_D("unhandled message sent typeTag="
620  , std::decay<decltype(*(ForwardIt()))>::type::typeTag);)
621  }
622  justBytesSend(begin, n);
623  }
624  }
625 
626 private:
627  void start(){}
628  template <typename Message>
629  using TransportEntry = blocking_context_detail::TransportEntry<Message>;
630 
631  template <typename Message>
632  struct TransportEntries : std::vector<TransportEntry<Message>> {
633  // using vector::vector;
634  void applyJustBytes(uint16_t tag, void const* bytes
635  , size_t len, hasMemoryAttachment* att) {
636  if (typeTagMatch<Message>(tag)
637  && std::is_trivially_destructible<Message>::value) {
638  auto& entries = *this;
639  for (auto i = 0u; i < entries.size(); ++i) {
640  auto& e = entries[i];
641  // bool pred = true;
642  // if constexpr (Message::justBytes) {
643  // pred = e.deliverPred(tag, (uint8_t*)bytes);
644  // }
645  if (e.deliverJustBytesPred(tag, (uint8_t const*)bytes)) {
646  len = std::min(len
647  , e.transport->buffer.maxItemSize() - sizeof(MessageHead));
648  e.transport->buffer.template
649  putInPlace<MessageWrap<JustBytes>>(tag, bytes, len, att);
650  }
651  }
652  }
653  }
654  };
655 
656  template <typename Tuple> struct MCGen;
657  template <typename ...Messages>
658  struct MCGen<std::tuple<Messages ...>> {
659  using type = std::tuple<TransportEntries<Messages> ...>;
660  };
661 
662  using MsgConduits = typename MCGen<Interests>::type;
663 
664  MsgConduits msgConduits_;
665  using Threads = std::vector<std::thread>;
666  Threads threads_;
667  bool stopped_;
668 
669  template <typename MsgConduits, typename DeliverPred, typename Tuple>
670  struct setupConduit {
671  void operator()(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&){}
672  };
673 
674  struct createEntry {
675  template <bool careMsg, size_t i, typename M, typename MsgConduits, typename DeliverPred>
676  typename std::enable_if<careMsg, void>::type
677  doOrNot(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred) {
678  auto& entries = std::get<i>(msgConduits);
679  if constexpr (std::is_trivially_destructible<M>::value) {
680  entries.emplace_back(TransportEntry<M>{t, pred, pred});
681  } else {
682  entries.emplace_back(TransportEntry<M>{t, pred});
683  }
684  }
685 
686  template <bool careMsg, size_t i, typename M, typename MsgConduits, typename DeliverPred>
687  typename std::enable_if<!careMsg, void>::type
688  doOrNot(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&) {
689  }
690  };
691 
692  template <typename MsgConduits, typename DeliverPred, typename M, typename ...Messages>
693  struct setupConduit<MsgConduits, DeliverPred, std::tuple<M, Messages...>> {
694  void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred){
695  auto constexpr i = index_in_tuple<M, Interests>::value;
696  createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
697  msgConduits, t, pred);
698  setupConduit<MsgConduits, DeliverPred, std::tuple<Messages...>>()
699  (msgConduits,t, std::forward<DeliverPred>(pred));
700  }
701  };
702 
703  template <typename Message>
704  struct can_handle {
705  private:
706  using M = typename std::decay<Message>::type;
707  public:
708  enum {
709  index = index_in_tuple<TransportEntries<M>, MsgConduits>::value,
710  match = index < std::tuple_size<MsgConduits>::value,
711  justbytes_index = index_in_tuple<TransportEntries<JustBytes>, MsgConduits>::value,
712  justbytes = justbytes_index < std::tuple_size<MsgConduits>::value
713  && std::is_trivially_destructible<M>::value? 1 :0,
714  };
715  };
716 
717  template <typename Client, typename RunOnceFunc>
718  auto kickOffClientThread(RunOnceFunc runOnceFunc, Client& c, pattern::BlockingBuffer* buffer
719  , uint64_t mask, time::Duration maxBlockingTime) {
720  if (buffer == nullptr) {
721  HMBDC_LOG_ONCE(HMBDC_LOG_D("starting a Client that cannot receive messages, Client typid()="
722  , typeid(Client).name());)
723  }
724  std::thread thrd([
725  this
726  , &c
727  , buffer
728  , mask
729  , maxBlockingTime
730  , runOnceFunc]() {
731  std::string name;
732  char const* schedule;
733  int priority;
734 
735  if (c.hmbdcName()) {
736  name = c.hmbdcName();
737  } else {
738  name = "hmbdc-b";
739  }
740  try {
741  auto cpuAffinityMask = mask;
742  std::tie(schedule, priority) = c.schedSpec();
743 
744  if (!schedule) schedule = "SCHED_OTHER";
745 
746  os::configureCurrentThread(name.c_str(), cpuAffinityMask
747  , schedule, priority);
748  __atomic_add_fetch(&dispStartCount_, 1, __ATOMIC_RELEASE);
749  c.messageDispatchingStartedCb(&dispStartCount_);
750  } catch (std::exception const& e) {
751  c.stopped(e);
752  return;
753  } catch (int code) {
754  c.stopped(ExitCode(code));
755  return;
756  } catch (...) {
758  return;
759  }
760 
761  while(!stopped_ && runOnceFunc(this->stopped_, buffer, c, maxBlockingTime)) {
762  }
763  if (stopped_) {
764  if (buffer) { /// drain all to release sending party
766  size_t count;
767  do {
768  usleep(10000);
769  count = buffer->peek(begin, end);
770  buffer->wasteAfterPeek(count);
771  } while (count);
772  }
773  c.dropped();
774  }
775  }
776  );
777 
778  return thrd;
779  }
780 
781 
782  template <MessageC Message>
783  void justBytesSend(Message&& m) {
784  if constexpr (can_handle<Message>::justbytes) {
785  using M = typename std::decay<Message>::type;
786  auto constexpr i = can_handle<Message>::justbytes_index;
787  auto& entries = std::get<i>(msgConduits_);
788  for (auto i = 0u; i < entries.size(); ++i) {
789  auto& e = entries[i];
790  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
791  if (e.deliverPred(m.getTypeTag(), (uint8_t*)&m)) {
792  e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
793  }
794  } else {
795  HMBDC_THROW(std::out_of_range
796  , "need to increase maxItemSize in start() to >= "
797  << sizeof(MessageWrap<M>));
798  }
799  }
800  }
801  }
802 
803  template <MessageC Message, typename ... Args>
804  void justBytesSendInPlace(Args&& ...args) {
805  if constexpr (can_handle<Message>::justbytes) {
806  using M = typename std::decay<Message>::type;
807  static_assert(!std::is_same<JustBytes, M>::value, "use sendJustBytesInPlace");
808  auto constexpr i = can_handle<Message>::justbytes_index;
809  auto& entries = std::get<i>(msgConduits_);
810  for (auto i = 0u; i < entries.size(); ++i) {
811  auto& e = entries[i];
812  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
813  e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
814  } else {
815  HMBDC_THROW(std::out_of_range
816  , "need to increase maxItemSize in start() to >= "
817  << sizeof(MessageWrap<M>));
818  }
819  }
820  }
821  }
822 
823  template <MessageC Message>
824  bool justBytesTrySend(Message&& m, time::Duration timeout) {
825  if constexpr (can_handle<Message>::justbytes) {
826  using M = typename std::decay<Message>::type;
827  static_assert(!std::is_same<JustBytes, M>::value, "wrong args");
828  auto constexpr i = can_handle<Message>::justbytes_index;
829  auto& entries = std::get<i>(msgConduits_);
830  for (auto i = 0u; i < entries.size(); ++i) {
831  auto& e = entries[i];
832  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
833  if (e.deliverPred(m.getTypeTag(), (uint8_t*)&m)) {
834  if (!e.transport->buffer.tryPut(MessageWrap<M>(m), timeout)) {
835  return false;
836  }
837  }
838  } else {
839  HMBDC_THROW(std::out_of_range
840  , "need to increase maxItemSize in start() to >= "
841  << sizeof(MessageWrap<M>));
842  }
843  }
844  }
845  return true;
846  }
847 
848  template <MessageC Message, typename ... Args>
849  bool justBytesTrySendInPlace(Args&&... args) {
850  if constexpr (can_handle<Message>::justbytes) {
851  using M = typename std::decay<Message>::type;
852  auto constexpr i = can_handle<Message>::justbytes_index;
853  auto& entries = std::get<i>(msgConduits_);
854  for (auto i = 0u; i < entries.size(); ++i) {
855  auto& e = entries[i];
856  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
857  if (!e.transport->buffer.template
858  tryPutInPlace<MessageWrap<M>>(args...)) {
859  return false;
860  }
861  } else {
862  HMBDC_THROW(std::out_of_range
863  , "need to increase maxItemSize in start() to >= "
864  << sizeof(MessageWrap<M>));
865  }
866  }
867  }
868  return true;
869  }
870 
871  template <MessageForwardIterC ForwardIt>
872  void justBytesSend(ForwardIt begin, size_t n) {
873  if constexpr (can_handle<decltype(*(ForwardIt()))>::justbytes) {
874  using Message = decltype(*begin);
875  using M = typename std::decay<Message>::type;
876  auto constexpr i = can_handle<decltype(*(ForwardIt()))>::justbytes_index;
877  auto& entries = std::get<i>(msgConduits_);
878  for (auto i = 0u; i < entries.size(); ++i) {
879  auto& e = entries[i];
880 
881  if (e.transport->buffer.maxItemSize() >= sizeof(MessageWrap<M>)) {
882  if (e.deliverPred(begin->getTypeTag(), (uint8_t*)&*begin)) {
883  while (!e.transport->buffer.template tryPutBatchInPlace<MessageWrap<M>>(begin, n)) {}
884  }
885  } else {
886  HMBDC_THROW(std::out_of_range
887  , "need to increase maxItemSize in start() to >= "
888  << sizeof(MessageWrap<M>));
889  }
890  }
891  }
892  }
893 
894  private:
895  size_t dispStartCount_ = 0;
896 
897 
898 };
899 }}
Definition: BlockingBuffer.hpp:17
Definition: MetaUtils.hpp:84
void sendInPlace(Args &&... args)
send a message by directly constructing the message in receiving message queue
Definition: BlockingContext.hpp:464
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:137
Definition: BlockingContext.hpp:632
void send(ForwardIt begin, size_t n)
send a range of messages via the BlockingContext
Definition: BlockingContext.hpp:602
bool runOnce(ClientRegisterHandle &t, CcClient &c, time::Duration maxBlockingTime)
run the Client previous registered to process exsiting Messages
Definition: BlockingContext.hpp:326
typename std::conditional< index_in_tuple< JustBytes, Interests >::value==std::tuple_size< Interests >::value, Interests, typename merge_tuple_unique< std::tuple< JustBytes >, no_trivial >::type >::type type
avoid double delivery when JustBytes is involved
Definition: BlockingContext.hpp:36
bool trySendInPlace(Args &&... args)
best effort to send a message by directly constructing the message in receiving message queue ...
Definition: BlockingContext.hpp:561
Definition: TypedString.hpp:84
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:147
Definition: BlockingContext.hpp:25
Definition: Timers.hpp:70
Definition: BlockingContext.hpp:656
Definition: MetaUtils.hpp:48
Definition: BlockingContext.hpp:165
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingContext.hpp:172
Definition: BlockingContext.hpp:206
bool trySend(Message &&m, time::Duration timeout=time::Duration::seconds(0))
best effort to send a message to via the BlockingContext
Definition: BlockingContext.hpp:511
Definition: BlockingContext.hpp:216
ClientRegisterHandle registerToRun(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, DeliverPred &&pred=DeliverPred())
same as start as to the paramater arguments, except it just register the Client with the context with...
Definition: BlockingContext.hpp:301
void stopped(std::exception const &e) noexcept
the following are for internal use, don&#39;t change or override
Definition: Client.hpp:248
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:203
Definition: Message.hpp:212
Definition: BlockingContext.hpp:674
A special type of message only used on the receiving side.
Definition: Message.hpp:341
void start(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a client within the Context. The client is powered by a single OS thread.
Definition: BlockingContext.hpp:272
Definition: MetaUtils.hpp:265
void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename std::decay< decltype(**LoadSharingClientPtrIt())>::type::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a group of clients within the Context that collectively processing messages in a load sharing m...
Definition: BlockingContext.hpp:357
void join()
wait until all threads of the Context exit
Definition: BlockingContext.hpp:406
void stop()
stop the message dispatching - asynchronously
Definition: BlockingContext.hpp:389
Definition: Message.hpp:263
Definition: Time.hpp:134
BlockingContext()
trivial ctor
Definition: BlockingContext.hpp:225
Definition: BlockingContext.hpp:670
Definition: MetaUtils.hpp:100
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Exception that just has an exit code.
Definition: Exception.hpp:28
virtual void messageDispatchingStartedCb(size_t const *pClientDispatchingStarted)
called before any messages got dispatched - only once
Definition: Client.hpp:179
Definition: BlockingBuffer.hpp:264
void send(Message &&m)
send a message to the BlockingContext to dispatch
Definition: BlockingContext.hpp:422
a std tuple holding messages types it can dispatch
Definition: BlockingContext.hpp:206
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Base.hpp:12
auto kickOffClientThread(RunOnceFunc runOnceFunc, Client &c, pattern::BlockingBuffer *buffer, uint64_t mask, time::Duration maxBlockingTime)
Definition: BlockingContext.hpp:718
Definition: BlockingContext.hpp:40