hmbdc
simplify-high-performance-messaging-programming
Context.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #include "hmbdc/app/StuckClientPurger.hpp"
6 #include "hmbdc/Config.hpp"
7 #include "hmbdc/numeric/BitMath.hpp"
8 #include "hmbdc/time/Time.hpp"
9 
10 #include <boost/interprocess/allocators/allocator.hpp>
11 #include <memory>
12 #include <vector>
13 #include <list>
14 #include <mutex>
15 
16 namespace hmbdc { namespace app {
17 
18 
19 /**
20 * @example hello-world.cpp
21 * @example hmbdc.cpp
22 * @example hmbdc-log.cpp
23 * @example ipc-market-data-propagate.cpp
24 */
25 
26 /**
27  * @namespace hmbdc::app::context_property
28  * contains the trait types that defines how a Context behave and capabilities
29  */
30 namespace context_property {
31  /**
32  * @class broadcast
33  * @brief Context template parameter inidcating each message is
34  * sent to all clients within the Context.
35  * This is the default property of a Context.
36  * @details each message is still subjected to Client's message type
37  * filtering. In the case of ipc Context
38  * it is also sent to all clients in the attached ipc Contexts.
39  * When this Context is specialized using this type, the context normally
40  * works with heterogeneous Clients and all Clients can talk to each
41  * other thru the Context. Load balance among Clients can be achieved by
42  * participating Clients coordinatedly select message to process
43  * In addtion to the direct mode Clients, a Client running pool is supported
44  * with the Context - see pool related functions in Context.
45  *
46  * Implicit usage in hello-world.cpp: @snippet hello-world.cpp broadcast as default
47  * Explicit usage in hmbdc.cpp @snippet hmbdc.cpp explicit using broadcast
48  * There is no hard coded limit on how many Clients can be added into a pool
49  * Also, there is no limit on when you can add a Client into a pool.
50  * @tparam max_parallel_consumer max thread counts that processes messages
51  * that incudes pool threads plus the count of direct mode Clients that
52  * registers messages within the Context
53  * supported values: 4(default)
54  * 2,8,16,32,64,128,256 requires hmbdc licensed
55  */
56  template <uint16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
57  struct broadcast{
58  static_assert(max_parallel_consumer >= 4u
60  };
61 
62  /**
63  * @class partition
64  * @brief Context template parameter inidcating each message is
65  * sent to one and only one of the clients within the Context
66  * and its attached ipc Contexts if appllies.
67  * @details each message is still subjected to Client's message type
68  * filtering
69  * When this Context is specialized using this type, the context normally
70  * works with homogeneous Clients to achieve load balance thru threads. No
71  * coordination is needed between Clients.
72  * Only the direct mode Clients are supported, thread pool is NOT supported
73  * by this kind of Context - the pool related functions in Context are also disabled
74  *
75  * Example in server-cluster.cpp: @snippet server-cluster.cpp declare a partition context
76  */
77  struct partition{};
78 
79  /**
80  * @class msgless_pool
81  * @brief Context template parameter indicating the Context must contain a pool to run Clients
82  * and the Clients in the pool shall not receive messages - Unlike the default pool.
83  * @details msgless_pool performs better when its Clients don't need to receive messages from the Context.
84  * This is useful when the Clients are network transport engines. By default, partition Context
85  * don't come with a pool due to semantic reason, but this Context property enables a pool that
86  * does not deliver messages.
87  */
88  struct msgless_pool{};
89 
90  /**
91  * @class ipc_enabled
92  * @brief Context template parameter indicating the Context is ipc enabled and
93  * it can create or be attached to an ipc transport thru a transport name.
94  * @details In addition to the normal Context functions, the Context acts either as
95  * the creator (owner) of the named ipc transport or an attcher to the transport.
96  * Since the creator performs a critical function to purge crushed or
97  * stuck Clients to avoid buffer full for other well-behaving Clients, it is
98  * expected to be running (started) as long as ipc functions.
99  * ipc transport uses persistent shared memory and if the dtor of Context is not called
100  * due to crashing, there will be stale shared memory in /dev/shm.
101  * It is very important that the Context is constructed exactly
102  * the same size (see constructor) and type as the ipc
103  * transport creator specified.
104  * All Contexts attaching to a single ipc transport collectively are subjected to the
105  * max_parallel_consumer limits just like a sinlge local (non-ipc) Context does.
106  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
107  */
108  struct ipc_enabled{};
109 
110  /**
111  * @class pci_ipc
112  * @brief when processes are distributed on a PCIe board and host PC, add this property
113  * @details beta
114  *
115  */
116  struct pci_ipc{};
117 }
118 }}
119 
120 #include "hmbdc/app/ContextDetail.hpp"
121 namespace hmbdc { namespace app {
122 
123 namespace context_detail {
124 HMBDC_CLASS_HAS_DECLARE(hmbdc_ctx_queued_ts);
125 HMBDC_CLASS_HAS_DECLARE(hmbdcIpcFrom);
126 HMBDC_CLASS_HAS_DECLARE(ibmaProc);
127 
128 using namespace std;
129 using namespace hmbdc::pattern;
130 
131 /**
132  * @class ThreadCommBase<>
133  * @brief covers the inter-thread and ipc communication fascade
134  * @details this type's interface is exposed thru Context and the type itself is
135  * not directly used by users
136  * @tparam MaxMessageSize What is the max message size, need at compile time
137  * if the value can only be determined at runtime, set this to 0. Things can still work
138  * but will lost some compile time checking advantages, see maxMessageSizeRuntime below
139  * @tparam ContextProperties see types in context_property namespace
140  */
141 template <size_t MaxMessageSize, typename... ContextProperties>
143  : private context_detail::context_property_aggregator<ContextProperties...> {
144  using cpa = context_property_aggregator<ContextProperties...>;
145  using Buffer = typename cpa::Buffer;
146  using Allocator = typename cpa::Allocator;
147 
148  enum {
149  MAX_MESSAGE_SIZE = MaxMessageSize,
150  BUFFER_VALUE_SIZE = MaxMessageSize + sizeof(MessageHead), //8bytes for wrap
151  };
152 
153  size_t maxMessageSize() const {
154  if (MaxMessageSize == 0) return maxMessageSizeRuntime_;
155  return MaxMessageSize;
156  }
157 
158  /**
159  * @brief try send a batch of messages to the Context or attached ipc Contexts
160  * @details only the Clients that handles the Message will get it of course
161  * This function is threadsafe, which means you can call it anywhere in the code
162  *
163  * @param msgs messages
164  * @tparam Messages message types
165  */
166  template <MessageC M0, MessageC M1, typename ... Messages, typename Enabled
167  = typename std::enable_if<!std::is_integral<M1>::value, void>::type>
168  void
169  send(M0&& m0, M1&& m1, Messages&&... msgs) {
170  auto n = sizeof...(msgs) + 2;
171  auto it = buffer_.claim(n);
172  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
173  buffer_.commit(it, n);
174  }
175 
176  /**
177  * @brief try to send a batch of message to the Context or attached ipc Contexts
178  * @details this call does not block and it is transactional - send all or none
179  * This function is threadsafe, which means you can call it anywhere in the code
180  *
181  * @param msgs messages
182  * @tparam Messages message types
183  *
184  * @return true if send successfully
185  */
186  template <MessageC M0, MessageC M1, typename ... Messages, typename Enabled
187  = typename std::enable_if<!std::is_integral<M1>::value, void>::type>
188  bool
189  trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
190  auto n = sizeof...(msgs) + 2;
191  auto it = buffer_.tryClaim(n);
192  if (it) {
193  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
194  buffer_.commit(it, n);
195  return true;
196  }
197 
198  return false;
199  }
200 
201  /**
202  * @brief send a range of messages to the Context or attached ipc Contexts
203  * @details only the Clients that handles the Message will get it of course
204  * This function is threadsafe, which means you can call it anywhere in the code
205  *
206  * @param begin a forward iterator point at the start of the range
207  * @param n length of the range
208  */
209  template <MessageForwardIterC ForwardIt>
210  void
211  send(ForwardIt begin, size_t n) {
212  if (hmbdc_likely(n)) {
213  auto bit = buffer_.claim(n);
214  auto it = bit;
215  for (auto i = 0ul; i < n; i++) {
216  using Message = typename iterator_traits<ForwardIt>::value_type;
217  static_assert(std::is_trivially_destructible<Message>::value
218  , "cannot send message with dtor");
219  static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
220  , "hasMemoryAttachment Messages cannot be sent in group");
221  auto wrap = new (*it++) MessageWrap<Message>(*begin++);
222  if constexpr (has_hmbdc_ctx_queued_ts<Message>::value) {
223  wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
224  } else (void)wrap;
225  }
226  buffer_.commit(bit, n);
227  }
228  }
229 
230  /**
231  * @brief try send a range of messages to the Context or attached ipc Contexts
232  * @details this call does not block and it is transactional - send all or none
233  * This function is threadsafe, which means you can call it anywhere in the code
234  *
235  * @param begin a forward iterator point at the start of the range
236  * @param n length of the range
237  */
238  template <MessageForwardIterC ForwardIt>
239  bool
240  trySend(ForwardIt begin, size_t n) {
241  if (hmbdc_likely(n)) {
242  auto bit = buffer_.tryClaim(n);
243  if (hmbdc_unlikely(!bit)) return false;
244  auto it = bit;
245  for (auto i = 0ul; i < n; i++) {
246  using Message = typename iterator_traits<ForwardIt>::value_type;
247  static_assert(std::is_trivially_destructible<Message>::value
248  , "cannot send message with dtor");
249  static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
250  , "hasMemoryAttachment Messages cannot be sent in group");
251  auto wrap = new (*it++) MessageWrap<Message>(*begin++);
252  if constexpr (has_hmbdc_ctx_queued_ts<Message>::value) {
253  wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
254  } else (void)wrap;
255  }
256  buffer_.commit(bit, n);
257  }
258  return true;
259  }
260 
261  /**
262  * @brief send a message including hasMortAttachment message to the Context or attached ipc Contexts
263  * @details only the Clients that handles the Message will get it of course
264  * This function is threadsafe, which means you can call it anywhere in the code
265  * If sending hasMortAttachment message, the size of the attachment is runtime checked and
266  * restricted by Context capacity
267  *
268  * @param m message
269  * @tparam Message type
270  */
271  template <MessageC Message>
272  void
273  send(Message&& m) {
274  using M = typename std::decay<Message>::type;
275  static_assert(std::is_trivially_destructible<M>::value, "cannot send message with dtor");
276  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
277  , "message too big");
278  if constexpr(!std::is_base_of<hasMemoryAttachment, M>::value
279  || !cpa::ipc) {
280  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
281  HMBDC_THROW(std::out_of_range, "message too big, typeTag=" << m.getTypeTag());
282  }
283 
284  if constexpr (has_hmbdc_ctx_queued_ts<M>::value) {
285  auto it = buffer_.claim();
286  auto wrap = new (*it++) MessageWrap<M>(std::forward<Message>(m));
287  wrap->template get<M>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
288  buffer_.commit(it);
289  } else {
290  buffer_.put(MessageWrap<M>(std::forward<Message>(m)));
291  }
292  } else {
293  if constexpr (cpa::ipc && has_hmbdcShmRefCount<M>::value) {
294  if (m.template holdShmHandle<M>()) {
295  if (0 >= m.hmbdcShmRefCount) {
296  return; // no process is interested
297  }
298  auto it = buffer_.claim(1);
299  auto wrap = (new (*it) MessageWrap<InBandHasMemoryAttachment<M>>(m)); (void)wrap;
300  wrap->payload.shmConvert(*shmAttAllocator_);
301  if constexpr (has_hmbdc_ctx_queued_ts<M>::value) {
302  wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
303  }
304  buffer_.commit(it, 1);
305  m.hasMemoryAttachment::release();
306  return;
307  }
308  }
309  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0
310  && sizeof(MessageWrap<InBandHasMemoryAttachment<M>>) > buffer_.maxItemSize())) {
311  HMBDC_THROW(std::out_of_range, "message too big, typeTag=" << m.getTypeTag());
312  }
313  auto att = reinterpret_cast<hasMemoryAttachment&>(m);
314  size_t segSize = buffer_.maxItemSize() - sizeof(MessageHead);
315  auto n = (att.len + segSize - 1) / segSize + 1;
316  if (hmbdc_unlikely(n > buffer_.capacity())) {
317  HMBDC_THROW(std::out_of_range
318  , "hasMemoryAttachment message too big, typeTag=" << m.getTypeTag());
319  }
320 
321  auto bit = buffer_.claim(n);
322  auto it = bit;
323  // InBandHasMemoryAttachment<M> ibm{m};
324  auto wrap = (new (*it++) MessageWrap<InBandHasMemoryAttachment<M>>(m)); (void)wrap;
325  // wrap->scratchpad().ipc.hd.inbandUnderlyingTypeTag = m.getTypeTag();
326 
327  if constexpr (has_hmbdc_ctx_queued_ts<M>::value) {
328  wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
329  }
330  auto segStart = (char*)att.attachment;
331  auto remaining = (size_t)att.len;
332  while(--n) {
333  auto wrap = (new (*it++) MessageWrap<InBandMemorySeg>());
334  auto& ms = wrap->get<InBandMemorySeg>();
335  auto bytes = std::min(segSize, (size_t)remaining);
336  wrap->scratchpad().ipc.inbandPayloadLen = bytes;
337  memcpy(ms.seg, segStart, bytes);
338  segStart += bytes;
339  remaining -= bytes;
340  }
341  buffer_.commit(bit, it - bit);
342  att.release();
343  }
344  }
345 
346  /**
347  * @brief preallocate consecutive buffers so they can be send out later
348  *
349  * @param n how many buffers to allocate - each buffer has the same sizeof max
350  * @return Buffer::iterator
351  */
352  template <MessageC Message>
353  auto allocateForSend(size_t n) {
354  struct IteratorAdaptor {
355  typename Buffer::iterator it;
356  Message& get() {
357  auto wrap = (MessageWrap<Message>*)(*it);
358  return wrap->payload;
359  }
360  auto& operator ++() {
361  ++it;
362  return *this;
363  }
364  auto operator ++(int) {
365  auto tmp = IteratorAdaptor{it++};
366  return tmp;
367  }
368  };
369 
370  auto it = buffer_.claim(n);
371  auto res = IteratorAdaptor{it};
372  while (n--) {
373  new (*it++) MessageWrap<Message>;
374  }
375  return res;
376  }
377 
378  /**
379  * @brief commit all the filled up buffers allocated by allocateForSend
380  * and send them out
381  *
382  * @tparam IteratorAdaptor the return value type of allocateForSend
383  * @param itA the returned calue of allocateForSend
384  */
385  template <typename IteratorAdaptor>
386  void commitForSend(IteratorAdaptor itA) {
387  buffer_.commit(itA.it);
388  }
389 
390  /**
391  * @brief try to send a message including hasMortAttachment message to the Context or attached ipc Contexts
392  * if it wouldn't block
393  * @details this call does not block - return false when buffer is full
394  * This function is threadsafe, which means you can call it anywhere in the code
395  * If sending hasMortAttachment message, the size of the attachment is runtime checked and
396  * restricted by Context capacity
397  *
398  * @param m message
399  * @tparam Message type
400  * @return true if send successfully
401  */
402  template <MessageC Message>
403  bool trySend(Message&& m) {
404  using M = typename std::decay<Message>::type;
405  static_assert(std::is_trivially_destructible<M>::value, "cannot send message with dtor");
406  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
407  , "message too big");
408 
409  if constexpr(!std::is_base_of<hasMemoryAttachment, typename std::decay<Message>::type>::value) {
410  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
411  HMBDC_THROW(std::out_of_range, "message too big");
412  }
413  return buffer_.tryPut(MessageWrap<M>(std::forward<Message>(m)));
414  } else {
415  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<InBandHasMemoryAttachment<M>>) > buffer_.maxItemSize())) {
416  HMBDC_THROW(std::out_of_range, "message too big, typeTag=" << m.getTypeTag());
417  }
418  auto att = reinterpret_cast<hasMemoryAttachment&>(m);
419  size_t segSize = buffer_.maxItemSize() - sizeof(MessageHead);
420  auto n = (att.len + segSize - 1) / segSize + 1;
421  if (hmbdc_unlikely(n > buffer_.capacity())) {
422  HMBDC_THROW(std::out_of_range, "hasMemoryAttachment message too big, typeTag=" << m.getTypeTag());
423  }
424 
425  auto bit = buffer_.tryClaim(n);
426  if (!bit) return false;
427  auto it = bit;
428  InBandHasMemoryAttachment<M> ibm{std::forward<Message>(m)};
429 
430  (new (*it++) MessageWrap<InBandHasMemoryAttachment<M>>(ibm))
431  ->scratchpad().inbandUnderlyingTypeTag = m.getTypeTag();
432  auto segStart = (char*)att.attachment;
433  auto remaining = (size_t)att.len;
434  while(--n) {
435  auto wrap = (new (*it++) MessageWrap<InBandMemorySeg>());
436  auto& ms = wrap->get<InBandMemorySeg>();
437  auto bytes = std::min(segSize, (size_t)remaining);
438  wrap->scratchpad().ipc.inbandPayloadLen = bytes;
439  memcpy(ms.seg, segStart, bytes);
440  segStart += bytes;
441  remaining -= bytes;
442  }
443  buffer_.commit(bit, it - bit);
444  att.release();
445  return true;
446  }
447  }
448 
449  /**
450  * @brief send a message to all Clients in the Context or attached ipc Contexts
451  * @details construct the Message in buffer directly
452  * This function is threadsafe, which means you can call it anywhere in the code
453  *
454  * @param args ctor args
455  * @tparam Message type
456  * @tparam typename ... Args args
457  */
458  template <MessageC Message, typename ... Args>
459  void sendInPlace(Args&&... args) {
460  static_assert(!std::is_base_of<JustBytes, Message>::value
461  , "use sendJustBytesInPlace");
462  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
463  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
464  , "message too big");
465  static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
466  , "hasMemoryAttachment Messages cannot be sent in place");
467  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
468  HMBDC_THROW(std::out_of_range
469  , "message too big buffer_.maxItemSize()=" << buffer_.maxItemSize());
470  }
471  buffer_.template putInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
472  }
473 
474  template <typename JustBytesType, typename ... Args>
475  void sendJustBytesInPlace(uint16_t tag, void const* bytes, size_t len
476  , app::hasMemoryAttachment* att, Args&& ...args) {
477  if (hmbdc_unlikely(len > maxMessageSize())) {
478  HMBDC_THROW(std::out_of_range, "message too big, typeTag=" << tag
479  << " len=" << len);
480  }
481  if (!att || !cpa::ipc) {
482  auto it = buffer_.claim();
483  new (*it) MessageWrap<JustBytesType>(tag, bytes, len, att
484  , std::forward<Args>(args)...);
485  buffer_.commit(it);
486  } else {
487  size_t segSize = buffer_.maxItemSize() - sizeof(MessageHead);
488  auto n = (att->len + segSize - 1) / segSize + 1;
489  if (hmbdc_unlikely(n > buffer_.capacity())) {
490  HMBDC_THROW(std::out_of_range
491  , "hasMemoryAttachment message too big, typeTag=" << tag);
492  }
493 
494  auto bit = buffer_.claim(n);
495  auto it = bit;
496  auto wrap = new (*it++) MessageWrap<InBandHasMemoryAttachment<JustBytesType>>(
497  tag, bytes, len, att, std::forward<Args>(args)...); (void)wrap;
498  // wrap->scratchpad().ipc.hd.inbandUnderlyingTypeTag = tag;
499 
500  auto segStart = (char*)att->attachment;
501  auto remaining = (size_t)att->len;
502  while(--n) {
503  auto wrap = (new (*it++) MessageWrap<InBandMemorySeg>());
504  auto& ms = wrap->get<InBandMemorySeg>();
505  auto bytes = std::min(segSize, (size_t)remaining);
506  wrap->scratchpad().ipc.inbandPayloadLen = bytes;
507  memcpy(ms.seg, segStart, bytes);
508  segStart += bytes;
509  remaining -= bytes;
510  }
511  buffer_.commit(bit, it - bit);
512  att->release();
513  }
514  }
515 
516 
517  /**
518  * @brief try send a message to all Clients in the Context or attached ipc Contexts if it wouldn't block
519  * @details this call does not block - return false when buffer is full
520  * constructed the Message in buffer directly if returns true
521  * This function is threadsafe, which means you can call it anywhere in the code
522  *
523  * @param args ctor args
524  * @tparam Message type
525  * @tparam typename ... Args args
526  * @return true if send successfully
527  */
528  template <MessageC Message, typename ... Args>
529  bool trySendInPlace(Args&&... args) {
530  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
531  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
532  , "message too big");
533  static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value, "hasMemoryAttachment Messages cannot be sent in place");
534  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
535  HMBDC_THROW(std::out_of_range, "message too big");
536  }
537  return buffer_.template tryPutInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
538  }
539 
540  /**
541  * @brief accessor - mostly used internally
542  * @return underlying buffer used in the Context
543  */
544  Buffer& buffer() {
545  return buffer_;
546  }
547 
548  size_t dispatchingStartedCount() const {
549  __atomic_thread_fence(__ATOMIC_ACQUIRE);
550  return *pDispStartCount_;
551  }
552 
553  template <typename T, typename ...Args>
554  std::shared_ptr<T> allocateInShm(size_t actualSize, Args&& ...args) {
555  static_assert(std::is_trivially_destructible<T>::value);
556  auto ptr = shmAttAllocator_->allocate(actualSize);
557  auto ptrT = new (ptr) T{std::forward<Args>(args)...};
558  // HMBDC_LOG_N(shmAttAllocator_->getHandle(ptr));
559  return std::shared_ptr<T>(
560  ptrT
561  , [this](T* t) {
562  if (0 == __atomic_sub_fetch(&t->hmbdc0cpyShmRefCount, 1, __ATOMIC_RELEASE)) {
563  shmAttAllocator_->deallocate((uint8_t*)t);
564  }
565  });
566  }
567 
568 protected:
569  template <typename BoolLvOrRv>
570  ThreadCommBase(uint32_t messageQueueSizePower2Num
571  , size_t maxMessageSizeRuntime
572  , char const* shmName
573  , size_t offset
574  , BoolLvOrRv&& tryOwn
575  , size_t ipcShmForAttPoolSize)
576  : allocator_(shmName
577  , offset
578  , Buffer::footprint(maxMessageSizeRuntime + sizeof(MessageHead)
579  , messageQueueSizePower2Num) + SMP_CACHE_BYTES + sizeof(*pDispStartCount_)
580  , tryOwn)
581  , pDispStartCount_(allocator_.template allocate<size_t>(SMP_CACHE_BYTES, 0))
582  , bufferptr_(allocator_.template allocate<Buffer>(SMP_CACHE_BYTES
583  , maxMessageSizeRuntime + sizeof(MessageHead), messageQueueSizePower2Num
584  , allocator_)
585  )
586  , buffer_(*bufferptr_) {
587  if (messageQueueSizePower2Num < 2) {
588  HMBDC_THROW(std::out_of_range
589  , "messageQueueSizePower2Num need >= 2");
590  }
591  if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
592  HMBDC_THROW(std::out_of_range
593  , "can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
594  }
595  maxMessageSizeRuntime_ = maxMessageSizeRuntime;
596  // primeBuffer<(cpa::create_ipc || (!cpa::create_ipc && !cpa::attach_ipc)) && cpa::has_pool>();
597  if (((cpa::ipc && tryOwn) || !cpa::ipc) && cpa::has_pool) {
598  markDeadFrom(buffer_, 0);
599  }
600 
601  if (cpa::ipc && ipcShmForAttPoolSize) {
602  using namespace boost::interprocess;
603  auto name = std::string(shmName) + "-att-pool";
604  if (tryOwn) {
605  shm_unlink(name.c_str());
606  shmAttAllocator_.emplace(
607  tryOwn, create_only, name.c_str(), ipcShmForAttPoolSize);
608  } else {
609  shmAttAllocator_.emplace(tryOwn, open_only, name.c_str());
610  }
611  }
612  // if (cpa::ipc) {
613  // sleep(2);
614  // }
615  }
616 
617  ~ThreadCommBase() {
618  allocator_.unallocate(bufferptr_);
619  }
620 
621  static
622  void markDeadFrom(pattern::MonoLockFreeBuffer& buffer, uint16_t) {
623  // does not apply
624  }
625 
626  template <typename BroadCastBuf>
627  static
628  void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
629  for (uint16_t i = poolThreadCount;
630  i < BroadCastBuf::max_parallel_consumer;
631  ++i) {
632  buffer.markDead(i);
633  }
634  }
635 
636 
637  static
638  void markDead(pattern::MonoLockFreeBuffer& buffer, std::list<uint16_t>slots) {
639  // does not apply
640  }
641 
642  template <typename BroadCastBuf>
643  static
644  void markDead(BroadCastBuf& buffer, std::list<uint16_t>slots) {
645  for (auto s : slots) {
646  buffer.markDead(s);
647  }
648  }
649 
650  Allocator allocator_;
651  size_t* pDispStartCount_;
652  Buffer* HMBDC_RESTRICT bufferptr_;
653  Buffer& HMBDC_RESTRICT buffer_;
654 
656  template <typename Arg, typename ...Args>
657  ShmAttAllocator(bool own, Arg&& arg, char const* name, Args&& ... args)
658  : managedShm_(std::forward<Arg>(arg), name, std::forward<Args>(args)...) {
659  if (own) {
660  nameUnlink_ = name;
661  }
662  }
663 
664  ~ShmAttAllocator() {
665  if (nameUnlink_.size()) {
666  shm_unlink(nameUnlink_.c_str());
667  }
668  }
669 
670  boost::interprocess::managed_shared_memory::handle_t
671  getHandle(void* localAddr) const {
672  return managedShm_.get_handle_from_address(localAddr);
673  }
674 
675  uint8_t*
676  getAddr(boost::interprocess::managed_shared_memory::handle_t h) const {
677  return (uint8_t*)managedShm_.get_address_from_handle(h);
678  }
679 
680  uint8_t* allocate(size_t len) {
681  auto res = (uint8_t*)managedShm_.allocate(len);
682  // HMBDC_LOG_N(getHandle(res));
683  return res;
684  }
685 
686  auto deallocate(uint8_t* p) {
687  // HMBDC_LOG_N(getHandle(p));
688  return managedShm_.deallocate(p);
689  }
690 
691  private:
692  boost::interprocess::managed_shared_memory managedShm_;
693  std::string nameUnlink_;
694  };
695  std::optional<ShmAttAllocator> shmAttAllocator_;
696 
697 private:
698 
699  template <typename M, typename... Messages>
700  void sendRecursive(typename Buffer::iterator it
701  , M&& msg, Messages&&... msgs) {
702  using Message = typename std::decay<M>::type;
703  static_assert(std::is_trivially_destructible<Message>::value
704  , "cannot send message with dtor");
705  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
706  , "message too big");
707  static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
708  , "hasMemoryAttachment Messages cannot be sent in group");
709  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0
710  && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
711  HMBDC_THROW(std::out_of_range, "message too big");
712  }
713  auto wrap = new (*it) MessageWrap<Message>(msg);
714  if constexpr (has_hmbdc_ctx_queued_ts<Message>::value) {
715  wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
716  } else (void)wrap;
717  sendRecursive(++it, std::forward<Messages>(msgs)...);
718  }
719  void sendRecursive(typename Buffer::iterator) {}
720 
721  size_t maxMessageSizeRuntime_;
722 };
723 
724 } //context_detail
725 
726 /**
727  * @example hmbdc.cpp
728  * @example server-cluster.cpp
729  * a partition Context rightlyfully doesn't contain a thread pool and all its Clients
730  * are in direct mode. Pool related interfaces are turned off in compile time
731  */
732 
733 /**
734  * @class Context<>
735  * @brief A Context is like a media object that facilitates the communications
736  * for the Clients that it is holding.
737  * a Client can only be added to (or started within) once to a single Context,
738  * undefined behavior otherwise.
739  * the communication model is determined by the context_property
740  * by default it is in the nature of broadcast fashion within local process indicating
741  * by broadcast<>
742  *
743  * @details a broadcast Context contains a thread Pool powered by a number of OS threads.
744  * a Client running in such a Context can either run in the pool mode or a direct mode
745  * (which means the Client has its own dedicated OS thread)
746  * direct mode provides faster responses, and pool mode provides more flexibility.
747  * It is recommended that the total number of threads (pool threads + direct threads)
748  * not exceeding the number of available CPUs.
749  * @tparam MaxMessageSize What is the max message size if known
750  * at compile time(compile time sized);
751  * if the value can only be determined at runtime (run time sized), set this to 0.
752  * Things can still work but will lost some compile time checking advantages,
753  * see maxMessageSizeRuntime below
754  * @tparam ContextProperties see context_property namespace
755  */
756 template <size_t MaxMessageSize = 0, typename... ContextProperties>
757 struct Context
758 : context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...> {
759  using Base = context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...>;
760  using Buffer = typename Base::Buffer;
761  using cpa = typename Base::cpa;
762  using Pool = typename std::conditional<cpa::pool_msgless
764  , pattern::PoolT<Buffer>>::type;
765  /**
766  * @brief ctor for construct local non-ipc Context
767  * @details won't compile if calling it for ipc Context
768  * @param messageQueueSizePower2Num value of 10 gives message queue if size of 1024
769  * (messages, not bytes)
770  * @param maxPoolClientCount up to how many Clients the pool is suppose to support,
771  * only used when
772  * pool supported in the Context with broadcast property
773  * @param maxMessageSizeRuntime if MaxMessageSize == 0, this value is used
774  * the context can manage
775  */
776  Context(uint32_t messageQueueSizePower2Num = MaxMessageSize?20:2
777  , size_t maxPoolClientCount = MaxMessageSize?128:0
778  , size_t maxMessageSizeRuntime = MaxMessageSize)
779  : Base(messageQueueSizePower2Num < 2?2:messageQueueSizePower2Num
780  , MaxMessageSize?MaxMessageSize:maxMessageSizeRuntime
781  , nullptr, 0, false, 0)
782  , usedHmbdcCapacity_(0)
783  , stopped_(false)
784  , pool_(createPool<cpa>(maxPoolClientCount))
785  , poolThreadCount_(0) {
786  static_assert(!cpa::ipc, "no name specified for ipc Context");
787  }
788 
789  /**
790  * @brief ctor for construct local ipc Context
791  * @details won't compile if calling it for local non-ipc Context
792  *
793  * @param tryOwn input/output flag - if the caller tries to be the owner of
794  * this IPC Context - mean ctor called on it and when exiting, remove the IPC shm file.
795  * if true, try create if created successful, set tryOwn to true
796  * if already there, attach to it set tryOwn to false;
797  * if false, just attach to existing portal;
798  * @param ipcTransportName the id to identify an ipc transport that supports
799  * a group of attached together Contexts and their Clients
800  * @param messageQueueSizePower2Num value of 10 gives message queue if size of
801  * 1024 (messages, not bytes)
802  * @param maxPoolClientCount up to how many Clients the pool is suppose to support,
803  * only used when pool supported in the Context with broadcast property
804  * @param maxMessageSizeRuntime if MaxMessageSize == 0, this value is used
805  * @param purgerCpuAffinityMask which CPUs to run the low profile (sleep mostly)
806  * thread in charge of purging crashed Clients. Used only for ipc_creator Contexts.
807  * @param ipcTransportDeviceOffset the offset in the ipcTransport dev for the use region
808  * the context can manage
809  */
810  template <typename BoolRvOrLv
811  , std::enable_if_t<std::is_same<bool, typename std::decay<BoolRvOrLv>::type>::value>*
812  = nullptr
813  >
814  Context(BoolRvOrLv&& tryOwn
815  , char const* ipcTransportName
816  , uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
817  , size_t maxPoolClientCount = MaxMessageSize?128:0
818  , size_t maxMessageSizeRuntime = MaxMessageSize
819  , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful
820  , size_t ipcTransportDeviceOffset = 0
821  , size_t ipcShmForAttPoolSize = 0)
822  : Base(messageQueueSizePower2Num
823  , MaxMessageSize?MaxMessageSize:maxMessageSizeRuntime
824  , ipcTransportName
825  , ipcTransportDeviceOffset, tryOwn, ipcShmForAttPoolSize)
826  , usedHmbdcCapacity_(0)
827  , stopped_(false)
828  , pool_(createPool<cpa>(maxPoolClientCount))
829  , poolThreadCount_(0)
830  , secondsBetweenPurge_(60)
831  , ifIpcCreator_(tryOwn)
832  , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
833  static_assert(cpa::ipc, "ctor can only be used with ipc turned on Context");
834  }
835 
836  /**
837  * @brief dtor
838  * @details if this Context owns ipc transport, notify all attached processes
839  * that read from it that this tranport is dead
840  */
842  if (ifIpcCreator_) {
843  Base::markDeadFrom(this->buffer_, 0);
844  }
845  stop();
846  join();
847  }
848 
849  /**
850  * @brief add a client to Context's pool - the Client is run in pool mode
851  * @details if pool is already started, the client is to get current Messages immediatly
852  * - might miss older messages.
853  * if the pool not started yet, the Client does not get messages or other callbacks until
854  * the Pool starts.
855  * This function is threadsafe, which means you can call it anywhere in the code
856  * @tparam Client client type
857  * @param client to be added into the Pool
858  * @param poolThreadAffinityIn pool is powered by a number of threads
859  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
860  * it is possible to have a Client to use just some of the threads in the Pool
861  * - default to use all.
862  *
863  */
864  template <typename Client>
865  void addToPool(Client &client
866  , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
867  static_assert(cpa::has_pool, "pool is not support in the Context type");
868  if (std::is_base_of<single_thread_powered_client, Client>::value
869  && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
870  && poolThreadCount_ != 1) {
871  HMBDC_THROW(std::out_of_range
872  , "cannot add a single thread powered client to the non-single"
873  "thread powered pool without specifying a single thread poolThreadAffinity"
874  );
875  }
876  primeForShmAtt(client);
877  auto stub = new context_detail::PoolConsumerProxy<Client>(client, this->pDispStartCount_);
878  pool_->addConsumer(*stub, poolThreadAffinityIn);
879 
880  }
881 
882  /**
883  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
884  * @details if pool is already started, the client is to get current Messages immediatly
885  * - might miss older messages.
886  * if the pool not started yet, the Client does not get messages or other callbacks until
887  * the Pool starts.
888  * This function is threadsafe, which means you can call it anywhere in the code
889  * @tparam Client client type
890  * @param client to be added into the Pool
891  * @param poolThreadAffinityIn pool is powered by a number of threads
892  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
893  * it is possible to have a Client to use just some of the threads in the Pool
894  * - default to use all.
895  * @param args more client and poolThreadAffinityIn pairs can follow
896  */
897  template <typename Client, typename ... Args>
898  void addToPool(Client &client
899  , uint64_t poolThreadAffinityIn, Args&& ...args) {
900  addToPool(client, poolThreadAffinityIn);
901  addToPool(std::forward<Args>(args)...);
902  }
903 
904  /**
905  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
906  * @details the implementatiotn tells all
907  * if the pool not started yet, the Client does not get messages or other callbacks until
908  * the Pool starts.
909  * This function is threadsafe, which means you can call it anywhere in the code
910  * @tparam Client client type
911  * @tparam Client2 client2 type
912  * @param client to be added into the Pool using default poolThreadAffinity
913  * @param client2 to be added into the Pool
914  * @param args more client (and/or poolThreadAffinityIn pairs can follow
915  */
916  template <typename Client, typename Client2, typename ... Args, typename Enabled
917  = typename std::enable_if<!std::is_integral<Client2>::value, void>::type>
918  void
919  addToPool(Client &client, Client2 &client2, Args&& ...args) {
920  addToPool(client);
921  addToPool(client2, std::forward<Args>(args)...);
922  }
923 
924  /**
925  * @brief return the numebr of clients added into pool
926  * @details the number could change since the clients could be added in another thread
927  * @return client count
928  */
929  size_t clientCountInPool() const {
930  static_assert(cpa::has_pool, "pool is not support in the Context type");
931  return pool_->consumerSize();
932  }
933 
934  /**
935  * @brief how many parallel consummers are started
936  * @details the dynamic value could change after the call returns
937  * see max_parallel_consumer Context property
938  * @return how many parallel consummers are started
939  */
940  size_t parallelConsumerAlive() const {
941  return this->buffer_.parallelConsumerAlive();
942  }
943  /**
944  * @brief start the context by specifying what are in it (Pool and/or direct Clients)
945  * and their paired up cpu affinities.
946  * @details All direct mode or clients in a pool started by a single start
947  * statement are dispatched with starting from the same event
948  * (subjected to event filtering of each client).
949  * many compile time and runtime check is done, for example:
950  * won't compile if start a pool in a Context does not support one;
951  * exception throw if the Context capacity is reached or try to start a second pool, etc.
952  *
953  * Usage example:
954  *
955  * @code
956  * // the following starts the pool powered by 3 threads that are affinitied to
957  * // the lower 8 CPUs; client0 affinitied to 4th CPU and client1 affinitied to 5th CPU
958  * ctx.start(3, 0xfful, client0, 0x8ul, client1, 0x10ul);
959  *
960  * // the following starts the pool powered by 3 threads that are affinitied to
961  * // all exisiting CPUs; client0 affinitied to a rotating CPU and
962  * // client1 affinitied to 5th CPU
963  * ctx.start(3, 0, client0, 0, client1, 0x10ul);
964  *
965  * // the following starts 2 direct mode Clients (client2 and client3)
966  * ctx.start(client2, 0x3ul, client3, 0xful);
967  * @endcode
968  *
969  * @tparam typename ...Args types
970  *
971  * @param args paired up args in the form of (pool-thread-count|client, cpuAffinity)*.
972  * see examples above.
973  * If a cpuAffinity is 0, each thread's affinity rotates to one of the CPUs in the system.
974  */
975  template <typename ...Args>
976  void
977  start(Args&& ... args) {
978  startWithContextProperty<cpa>(true, std::forward<Args>(args) ...);
979  }
980 
981  /**
982  * @brief similarly to the start call for the usage and parameters except the Client thread
983  * is not started by this call and the user is expected to use runOnce() to
984  * manually power the Client externally
985  *
986  * @tparam Args see start above
987  * @param args see start above
988  */
989  template <typename ...Args>
990  void
991  registerToRun(Args&& ... args) {
992  startWithContextProperty<cpa>(false, std::forward<Args>(args) ...);
993  }
994 
995  /**
996  * @brief stop the message dispatching - asynchronously
997  * @details asynchronously means not garanteed message dispatching
998  * stops immidiately after this non-blocking call
999  */
1000  void
1001  stop() {
1002  stopWithContextProperty<cpa>();
1003  }
1004 
1005  /**
1006  * @brief wait until all threads (Pool threads too if apply) of the Context exit
1007  * @details blocking call
1008  */
1009  void
1010  join() {
1011  joinWithContextProperty<cpa>();
1012  }
1013 
1014  /**
1015  * @brief ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...)
1016  * Clients from the ipc transport to make the ipc trasnport healthy (avoiding buffer full).
1017  * It periodically looks for things to purge. This is to set the period (default is 60 seconds).
1018  * @details If some Client are known to
1019  * take long to process messages, increase it. If you need to remove slow Clients quickly
1020  * reduce it.
1021  * Only effective for ipc_creator Context.
1022  *
1023  * @param s seconds - if set zero, purger is disabled
1024  */
1025  void
1027  secondsBetweenPurge_ = s;
1028  }
1029 
1030  /**
1031  * @brief normally not used until you want to run your own message loop
1032  * @details call this function frequently to pump hmbdc message loop in its pool
1033  *
1034  * @param threadSerialNumber starting from 0, indicate which thread in the pool
1035  * is powering the loop
1036  */
1037  void
1038  runOnce(uint16_t threadSerialNumberInPool) {
1039  static_assert(cpa::has_pool, "pool is not support in the Context type");
1040  pool_->runOnce(threadSerialNumberInPool);
1041  }
1042 
1043  /**
1044  * @brief normally not used until you want to run your own message loop
1045  * @details call this function frequently to pump hmbdc message loop for a direct mode Client
1046  *
1047  * @param threadSerialNumber indicate which thread is powering the loop
1048  * @param c the Client
1049  * @return true when the Client did not terminate itself by throwing an exeption
1050  * @return false otherwise
1051  */
1052  template <typename Client>
1053  bool runOnce(Client&& c) {
1054  // c.messageDispatchingStarted(
1055  // hmbdcNumbers_[threadSerialNumber]); //lower level ensures executing only once
1056  uint16_t tn = cpa::broadcast_msg?c.hmbdcNumber:0;
1057  primeForShmAtt(c);
1058 
1059  return context_detail::runOnceImpl(tn
1060  , stopped_, this->buffer_
1061  , c);
1062  }
1063 
1064 private:
1065  template <typename cpa>
1066  typename std::enable_if<cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
1067  createPool(size_t maxPoolClientCount) {
1068  return Pool::create(this->buffer(), maxPoolClientCount);
1069  }
1070 
1071  template <typename cpa>
1072  typename std::enable_if<cpa::pool_msgless, typename Pool::ptr>::type
1073  createPool(size_t maxPoolClientCount) {
1074  return Pool::create(maxPoolClientCount);
1075  }
1076 
1077  template <typename cpa>
1078  typename std::enable_if<!cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
1079  createPool(size_t) {
1080  return typename Pool::ptr();
1081  }
1082 
1083  template <typename cpa>
1084  typename std::enable_if<cpa::has_pool, void>::type
1085  stopWithContextProperty() {
1086  if (pool_) pool_->stop();
1087  __atomic_thread_fence(__ATOMIC_ACQUIRE);
1088  stopped_ = true;
1089  }
1090 
1091  template <typename cpa>
1092  typename std::enable_if<!cpa::has_pool, void>::type
1093  stopWithContextProperty() {
1094  __atomic_thread_fence(__ATOMIC_ACQUIRE);
1095  stopped_ = true;
1096  }
1097 
1098  template <typename cpa>
1099  typename std::enable_if<cpa::has_pool, void>::type
1100  joinWithContextProperty() {
1101  if (pool_) pool_->join();
1102  for (auto& t : threads_) {
1103  t.join();
1104  }
1105  threads_.clear();
1106  }
1107 
1108  template <typename cpa>
1109  typename std::enable_if<!cpa::has_pool, void>::type
1110  joinWithContextProperty() {
1111  for (auto& t : threads_) {
1112  t.join();
1113  }
1114  threads_.clear();
1115  }
1116 
1117  template <typename cpa>
1118  void
1119  reserveSlots(std::list<uint16_t>&) {
1120  }
1121 
1122  template <typename cpa, typename ...Args>
1123  typename std::enable_if<cpa::broadcast_msg && !cpa::pool_msgless, void>::type
1124  reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
1125  auto available = this->buffer_.unusedConsumerIndexes();
1126  if (available.size() < poolThreadCount) {
1127  HMBDC_THROW(std::out_of_range
1128  , "Context remaining capacilty = " << available.size()
1129  << ", consider increasing max_parallel_consumer");
1130  }
1131  for (uint16_t i = 0; i < poolThreadCount; ++i) {
1132  slots.push_back(available[i]);
1133  this->buffer_.reset(available[i]);
1134  }
1135  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
1136  }
1137 
1138  template <typename cpa, typename ...Args>
1139  typename std::enable_if<!cpa::broadcast_msg || cpa::pool_msgless, void>::type
1140  reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
1141  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
1142  }
1143 
1144  template <typename cpa, typename CcClient, typename ...Args>
1145  typename std::enable_if<cpa::broadcast_msg && !std::is_integral<CcClient>::value, void>::type
1146  reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
1147  const bool clientParticipateInMessaging =
1148  std::decay<CcClient>::type::INTERESTS_SIZE != 0;
1149  if (clientParticipateInMessaging) {
1150  auto available = this->buffer_.unusedConsumerIndexes();
1151  if (!available.size()) {
1152  HMBDC_THROW(std::out_of_range
1153  , "Context reached capacity, consider increasing max_parallel_consumer");
1154  }
1155  this->buffer_.reset(available[0]);
1156  slots.push_back(available[0]);
1157  }
1158  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
1159  }
1160 
1161  template <typename cpa, typename CcClient, typename ...Args>
1162  typename std::enable_if<!cpa::broadcast_msg && !std::is_integral<CcClient>::value, void>::type
1163  reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
1164  }
1165 
1166  template <typename cpa, typename ...Args>
1167  typename std::enable_if<cpa::ipc, void>::type
1168  startWithContextProperty(bool kickoffThread, Args&& ... args) {
1169  auto& lock = this->allocator_.fileLock();
1170  std::lock_guard<decltype(lock)> g(lock);
1171  std::list<uint16_t> slots;
1172  try {
1173  reserveSlots<cpa>(slots, args ...);
1174  auto sc = slots;
1175  startWithContextPropertyImpl<cpa>(kickoffThread, sc, std::forward<Args>(args) ...);
1176  } catch (std::out_of_range const&) {
1177  Base::markDead(this->buffer_, slots);
1178  throw;
1179  }
1180  }
1181 
1182  template <typename cpa, typename ...Args>
1183  typename std::enable_if<!cpa::ipc, void>::type
1184  startWithContextProperty(bool kickoffThread, Args&& ... args) {
1185  std::list<uint16_t> slots;
1186  try {
1187  reserveSlots<cpa>(slots, args ...);
1188  auto sc = slots;
1189  startWithContextPropertyImpl<cpa>(kickoffThread, sc, std::forward<Args>(args) ...);
1190  } catch (std::out_of_range const&) {
1191  Base::markDead(this->buffer_, slots);
1192  throw;
1193  }
1194  }
1195 
1196  template <typename cpa>
1197  typename std::enable_if<cpa::broadcast_msg && cpa::ipc, void>::type
1198  startWithContextPropertyImpl(bool kickoffThread, std::list<uint16_t>& slots) {
1199  if (ifIpcCreator_ && !purger_ && secondsBetweenPurge_) {
1200  purger_.reset(
1201  new StuckClientPurger<Buffer>(secondsBetweenPurge_, this->buffer_));
1202  startWithContextPropertyImpl<cpa>(kickoffThread, slots, *purger_, purgerCpuAffinityMask_);
1203  }
1204  }
1205 
1206  template <typename cpa>
1207  typename std::enable_if<!cpa::broadcast_msg || !cpa::ipc, void>::type
1208  startWithContextPropertyImpl(bool kickoffThread, std::list<uint16_t>& slots) {
1209  }
1210 
1211  template <typename cpa, typename ...Args>
1212  typename std::enable_if<cpa::has_pool, void>::type
1213  startWithContextPropertyImpl(bool kickoffThread, std::list<uint16_t>& slots
1214  , uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
1215  , Args&& ... args) {
1216  using namespace std;
1217  if (poolThreadCount_) {
1218  HMBDC_THROW(std::out_of_range, "Context pool already started");
1219  }
1220  std::vector<uint16_t> sc(slots.begin(), slots.end());
1221  if (!poolThreadsCpuAffinityMask) {
1222  auto cpuCount = std::thread::hardware_concurrency();
1223  poolThreadsCpuAffinityMask =
1224  ((1ul << poolThreadCount) - 1u) << (hmbdcNumbers_.size() % cpuCount);
1225  }
1226 
1227  pool_->startAt(poolThreadCount, poolThreadsCpuAffinityMask, sc);
1228  while(poolThreadCount--) {
1229  if (!cpa::pool_msgless) {
1230  hmbdcNumbers_.push_back(*slots.begin());
1231  slots.pop_front();
1232  }
1233  }
1234  poolThreadCount_ = poolThreadCount;
1235  startWithContextPropertyImpl<cpa>(kickoffThread, slots, std::forward<Args>(args) ...);
1236  }
1237 
1238  template <typename cpa, typename Client, typename ...Args>
1239  typename std::enable_if<!std::is_integral<Client>::value, void>::type
1240  startWithContextPropertyImpl(bool kickoffThread, std::list<uint16_t>& slots
1241  , Client& c, uint64_t cpuAffinity
1242  , Args&& ... args) {
1243  auto clientParticipateInMessaging =
1244  std::decay<Client>::type::INTERESTS_SIZE;
1245  uint16_t hmbdcNumber = 0xffffu;
1246  if (clientParticipateInMessaging && cpa::broadcast_msg) {
1247  hmbdcNumber = *slots.begin();
1248  c.hmbdcNumber = hmbdcNumber;
1249  slots.pop_front();
1250  }
1251  if (kickoffThread) {
1252  auto thrd = kickOffClientThread(
1253  c, cpuAffinity, hmbdcNumber, hmbdcNumbers_.size());
1254  threads_.push_back(move(thrd));
1255  }
1256  hmbdcNumbers_.push_back(hmbdcNumber);
1257  startWithContextPropertyImpl<cpa>(kickoffThread, slots, std::forward<Args>(args) ...);
1258  }
1259 
1260  template <typename Client>
1262  Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
1263  primeForShmAtt(c);
1264  std::thread thrd([
1265  this
1266  , &c
1267  , mask
1268  , h=hmbdcNumber
1269  , threadSerialNumber
1270  ]() {
1271  auto hmbdcNumber = h;
1272  std::string name;
1273  char const* schedule;
1274  int priority;
1275  auto clientParticipateInMessaging =
1276  std::decay<Client>::type::INTERESTS_SIZE;
1277 
1278 
1279  if (c.hmbdcName()) {
1280  name = c.hmbdcName();
1281  } else {
1282  if (clientParticipateInMessaging) {
1283  name = "hmbdc" + std::to_string(hmbdcNumber);
1284  } else {
1285  name = "hmbdc-x";
1286  }
1287  }
1288  try {
1289  auto cpuAffinityMask = mask;
1290  std::tie(schedule, priority) = c.schedSpec();
1291 
1292  if (!schedule) schedule = "SCHED_OTHER";
1293 
1294  if (!mask) {
1295  auto cpuCount = std::thread::hardware_concurrency();
1296  cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
1297  }
1298 
1299  hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
1300  , schedule, priority);
1301 
1302  hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
1303  __atomic_add_fetch(this->pDispStartCount_, 1, __ATOMIC_RELEASE);
1304  c.messageDispatchingStartedCb(this->pDispStartCount_);
1305  } catch (std::exception const& e) {
1306  c.stopped(e);
1307  return;
1308  } catch (int code) {
1309  c.stopped(ExitCode(code));
1310  return;
1311  } catch (...) {
1313  return;
1314  }
1315 
1316  while(!stopped_ &&
1317  context_detail::runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
1318  }
1319  if (this->stopped_) {
1320  if (clientParticipateInMessaging) { /// drain all to release sending party
1321  typename Buffer::iterator begin, end;
1322  size_t count;
1323  do {
1324  usleep(10000);
1325  if constexpr (cpa::broadcast_msg) {
1326  count = this->buffer_.peek(hmbdcNumber, begin, end);
1327  this->buffer_.wasteAfterPeek(hmbdcNumber, count);
1328  } else {
1329  count = this->buffer_.peek(begin, end);
1330  this->buffer_.wasteAfterPeek(begin, count);
1331  }
1332  } while (count);
1333  }
1334  c.dropped();
1335  }
1336  if (clientParticipateInMessaging) context_detail::unblock(this->buffer_, hmbdcNumber);
1337  }
1338  );
1339 
1340  return thrd;
1341  }
1342 
1343  template <typename Client>
1344  void primeForShmAtt(Client& c) {
1345  if constexpr (cpa::ipc && context_detail::has_ibmaProc<Client>::value) {
1346  if constexpr(!std::is_same<std::nullptr_t, decltype(c.ibmaProc)>::value) {
1347  if (!c.ibmaProc.hmbdcShmHandleToAddr) {
1348  c.ibmaProc.hmbdcShmHandleToAddr = [&alloc = this->shmAttAllocator_]
1349  (boost::interprocess::managed_shared_memory::handle_t h) {
1350  return alloc->getAddr(h);
1351  };
1352  c.ibmaProc.hmbdcShmDeallocator = [&alloc = this->shmAttAllocator_]
1353  (uint8_t* addr) {
1354  return alloc->deallocate(addr);
1355  };
1356  }
1357  }
1358  }
1359  }
1360 
1361  Context(Context const&) = delete;
1362  Context& operator = (Context const&) = delete;
1363  uint16_t usedHmbdcCapacity_;
1364  std::vector<uint16_t> hmbdcNumbers_;
1365 
1366  bool stopped_;
1367  typename Pool::ptr pool_;
1368  using Threads = std::vector<std::thread>;
1369  Threads threads_;
1370  size_t poolThreadCount_;
1371  uint32_t secondsBetweenPurge_;
1372  bool const ifIpcCreator_ = false;
1373  uint64_t purgerCpuAffinityMask_;
1374  typename std::conditional<cpa::broadcast_msg && cpa::ipc
1375  , std::unique_ptr<StuckClientPurger<Buffer>>, uint32_t
1376  >::type purger_;
1377 };
1378 
1379 }}
1380 
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:137
auto kickOffClientThread(Client &c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber)
Definition: Context.hpp:1261
Context template parameter indicating the Context must contain a pool to run Clients and the Clients ...
Definition: Context.hpp:88
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:1001
covers the inter-thread and ipc communication fascade
Definition: Context.hpp:142
void addToPool(Client &client, Client2 &client2, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:919
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:1010
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:77
Definition: TypedString.hpp:84
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:147
Definition: PoolMinus.hpp:9
void runOnce(uint16_t threadSerialNumberInPool)
normally not used until you want to run your own message loop
Definition: Context.hpp:1038
bool runOnce(Client &&c)
normally not used until you want to run your own message loop
Definition: Context.hpp:1053
Context template parameter indicating the Context is ipc enabled and it can create or be attached to ...
Definition: Context.hpp:108
void send(Message &&m)
send a message including hasMortAttachment message to the Context or attached ipc Contexts ...
Definition: Context.hpp:273
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingBuffer.hpp:11
void stopped(std::exception const &e) noexcept
the following are for internal use, don&#39;t change or override
Definition: Client.hpp:248
Context(uint32_t messageQueueSizePower2Num=MaxMessageSize?20:2, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize)
ctor for construct local non-ipc Context
Definition: Context.hpp:776
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:240
Definition: ContextDetail.hpp:26
Definition: Message.hpp:212
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:211
when processes are distributed on a PCIe board and host PC, add this property
Definition: Context.hpp:116
void commitForSend(IteratorAdaptor itA)
commit all the filled up buffers allocated by allocateForSend and send them out
Definition: Context.hpp:386
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:57
Definition: Message.hpp:459
bool trySend(M0 &&m0, M1 &&m1, Messages &&... msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:189
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context&#39;s pool - the Client is run in pool mode
Definition: Context.hpp:865
Definition: LockFreeBufferT.hpp:18
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:940
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:929
Context(BoolRvOrLv &&tryOwn, char const *ipcTransportName, uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, uint64_t purgerCpuAffinityMask=0xfffffffffffffffful, size_t ipcTransportDeviceOffset=0, size_t ipcShmForAttPoolSize=0)
ctor for construct local ipc Context
Definition: Context.hpp:814
auto allocateForSend(size_t n)
preallocate consecutive buffers so they can be send out later
Definition: Context.hpp:353
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:757
~Context()
dtor
Definition: Context.hpp:841
void registerToRun(Args &&... args)
similarly to the start call for the usage and parameters except the Client thread is not started by t...
Definition: Context.hpp:991
Definition: Message.hpp:263
Definition: BitMath.hpp:9
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:544
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:898
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
void send(M0 &&m0, M1 &&m1, Messages &&... msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:169
Exception that just has an exit code.
Definition: Exception.hpp:28
virtual void messageDispatchingStartedCb(size_t const *pClientDispatchingStarted)
called before any messages got dispatched - only once
Definition: Client.hpp:179
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:1026
void start(Args &&... args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:977
bool trySendInPlace(Args &&... args)
try send a message to all Clients in the Context or attached ipc Contexts if it wouldn&#39;t block ...
Definition: Context.hpp:529
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
void sendInPlace(Args &&... args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:459
bool trySend(Message &&m)
try to send a message including hasMortAttachment message to the Context or attached ipc Contexts if ...
Definition: Context.hpp:403
Definition: Base.hpp:12
Definition: PoolT.hpp:11
Definition: Message.hpp:430