hmbdc
simplify-high-performance-messaging-programming
Context.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #include "hmbdc/app/StuckClientPurger.hpp"
6 #include "hmbdc/app/RequestReply.hpp"
7 #include "hmbdc/Config.hpp"
8 #include "hmbdc/numeric/BitMath.hpp"
9 #include "hmbdc/time/Time.hpp"
10 
11 #include <memory>
12 #include <vector>
13 #include <list>
14 #include <mutex>
15 
16 namespace hmbdc { namespace app {
17 
18 
19 /**
20 * @example hello-world.cpp
21 * @example hmbdc.cpp
22 * @example hmbdc-log.cpp
23 * @example ipc-market-data-propagate.cpp
24 */
25 
26 /**
27  * @namespace hmbdc::app::context_property
28  * contains the trait types that defines how a Context behave and capabilities
29  */
30 namespace context_property {
31  /**
32  * @class broadcast
33  * @brief Context template parameter inidcating each message is
34  * sent to all clients within the Context.
35  * This is the default property of a Context.
36  * @details each message is still subjected to Client's message type
37  * filtering. In the case of ipc Context
38  * it is also sent to all clients in the attached ipc Contexts.
39  * When this Context is specialized using this type, the context normally
40  * works with heterogeneous Clients and all Clients can talk to each
41  * other thru the Context. Load balance among Clients can be achieved by
42  * participating Clients coordinatedly select message to process
43  * In addtion to the direct mode Clients, a Client running pool is supported
44  * with the Context - see pool related functions in Context.
45  *
46  * Implicit usage in hello-world.cpp: @snippet hello-world.cpp broadcast as default
47  * Explicit usage in hmbdc.cpp @snippet hmbdc.cpp explicit using broadcast
48  * There is no hard coded limit on how many Clients can be added into a pool
49  * Also, there is no limit on when you can add a Client into a pool.
50  * @tparam max_parallel_consumer max thread counts that processes messages
51  * that incudes pool threads plus the count of direct mode Clients that
52  * registers messages within the Context
53  * supported values: 4(default)
54  * 2,8,16,32,64 requires hmbdc licensed
55  */
56  template <uint16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
57  struct broadcast{
58  static_assert(max_parallel_consumer >= 4u
60  };
61 
62  /**
63  * @class partition
64  * @brief Context template parameter inidcating each message is
65  * sent to one and only one of the clients within the Context
66  * and its attached ipc Contexts if appllies.
67  * @details each message is still subjected to Client's message type
68  * filtering
69  * When this Context is specialized using this type, the context normally
70  * works with homogeneous Clients to achieve load balance thru threads. No
71  * coordination is needed between Clients.
72  * Only the direct mode Clients are supported, thread pool is NOT supported
73  * by this kind of Context - the pool related functions in Context are also disabled
74  *
75  * Example in server-cluster.cpp: @snippet server-cluster.cpp declare a partition context
76  */
77  struct partition{};
78 
79  /**
80  * @class msgless_pool
81  * @brief Context template parameter indicating the Context must contain a pool to run Clients
82  * and the Clients in the pool shall not receive messages - Unlike the default pool.
83  * @details msgless_pool performs better when its Clients don't need to receive messages from the Context.
84  * This is useful when the Clients are network transport engines. By default, partition Context
85  * don't come with a pool due to semantic reason, but this Context property enables a pool that
86  * does not deliver messages.
87  */
88  struct msgless_pool{};
89 
90  /**
91  * @class ipc_creator
92  * @brief Context template parameter indicating the Context is ipc enabled and
93  * it can be attached (see ipc_attacher below) to an ipc transport (thru its name).
94  * @details In addition to the normal Context functions, the Context acts as
95  * the creator (owner) of the named ipc transport.
96  * Since it performs a critical function to purge crushed or
97  * stuck Clients to avoid buffer full for other well-behaving Clients, it is
98  * expected to be running (started) as long as ipc functions.
99  * ipc transport uses persistent shared memory and if the dtor of Context is not called
100  * due to crashing, there will be stale shared memory in /dev/shm.
101  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
102  */
103  struct ipc_creator{};
104 
105  /**
106  * @class ipc_attacher
107  * @brief Context template parameter indicating the Context is ipc enabled and
108  * it can attach to an ipc transport thru a name.
109  * @details it is very important that the Context is constructed exactly
110  * the same size (see constructor) and type (partition vs broadcast) as the ipc
111  * transport creator specified (ipc_creator Context).
112  * All Contexts attaching to a single ipc transport collectively are subjected to the
113  * max_parallel_consumer limits just like a sinlge local (non-ipc) Context does.
114  * Example in ipc-market-data-propagate.cpp @snippet ipc-market-data-propagate.cpp declare an ipc context
115  */
116  struct ipc_attacher{};
117 
118  /**
119  * @class pci_ipc
120  * @brief when processes are distributed on a PCIe board and host PC, add this property
121  * @details beta
122  *
123  */
124  struct pci_ipc{};
125 }
126 }}
127 
128 #include "hmbdc/app/ContextDetail.hpp"
129 namespace hmbdc { namespace app {
130 
131 namespace context_detail {
132 using namespace std;
133 using namespace hmbdc::pattern;
134 
135 /**
136  * @class ThreadCommBase<>
137  * @brief covers the inter-thread and ipc communication fascade
138  * @details this type's interface is exposed thru Context and the type itself is
139  * not directly used by users
140  * @tparam MaxMessageSize What is the max message size, need at compile time
141  * if the value can only be determined at runtime, set this to 0. Things can still work
142  * but will lost some compile time checking advantages, see maxMessageSizeRuntime below
143  * @tparam ContextProperties see types in context_property namespace
144  */
145 template <size_t MaxMessageSize, typename... ContextProperties>
147  : private context_detail::context_property_aggregator<ContextProperties...> {
148  using cpa = context_property_aggregator<ContextProperties...>;
149  using Buffer = typename cpa::Buffer;
150  using Allocator = typename cpa::Allocator;
151 
152  enum {
153  MAX_MESSAGE_SIZE = MaxMessageSize,
154  BUFFER_VALUE_SIZE = MaxMessageSize + 8u, //8bytes for wrap
155  };
156 
157  size_t maxMessageSize() const {
158  if (MaxMessageSize == 0) return maxMessageSizeRuntime_;
159  return MaxMessageSize;
160  }
161 
162  /**
163  * @brief try send a batch of messages to the Context or attached ipc Contexts
164  * @details only the Clients that handles the Message will get it of course
165  * This function is threadsafe, which means you can call it anywhere in the code
166  *
167  * @param msgs messages
168  * @tparam Messages message types
169  */
170  template <MessageC M0, MessageC M1, typename ... Messages, typename Enabled
171  = typename std::enable_if<!std::is_integral<M1>::value, void>::type>
172  void
173  send(M0&& m0, M1&& m1, Messages&&... msgs) {
174  auto n = sizeof...(msgs) + 2;
175  auto it = buffer_.claim(n);
176  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
177  buffer_.commit(it, n);
178  }
179 
180  /**
181  * @brief try to send a batch of message to the Context or attached ipc Contexts
182  * @details this call does not block and it is transactional - send all or none
183  * This function is threadsafe, which means you can call it anywhere in the code
184  *
185  * @param msgs messages
186  * @tparam Messages message types
187  *
188  * @return true if send successfully
189  */
190  template <MessageC M0, MessageC M1, typename ... Messages, typename Enabled
191  = typename std::enable_if<!std::is_integral<M1>::value, void>::type>
192  bool
193  trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
194  auto n = sizeof...(msgs) + 2;
195  auto it = buffer_.tryClaim(n);
196  if (it) {
197  sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
198  buffer_.commit(it, n);
199  return true;
200  }
201 
202  return false;
203  }
204 
205  /**
206  * @brief send a range of messages to the Context or attached ipc Contexts
207  * @details only the Clients that handles the Message will get it of course
208  * This function is threadsafe, which means you can call it anywhere in the code
209  *
210  * @param begin a forward iterator point at the start of the range
211  * @param n length of the range
212  */
213  template <MessageForwardIterC ForwardIt>
214  void
215  send(ForwardIt begin, size_t n) {
216  if (hmbdc_likely(n)) {
217  auto bit = buffer_.claim(n);
218  auto it = bit;
219  for (auto i = 0ul; i < n; i++) {
220  using Message = typename iterator_traits<ForwardIt>::value_type;
221  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
222  new (*it++) MessageWrap<Message>(*begin++);
223  }
224  buffer_.commit(bit, n);
225  }
226  }
227 
228  /**
229  * @brief try send a range of messages to the Context or attached ipc Contexts
230  * @details this call does not block and it is transactional - send all or none
231  * This function is threadsafe, which means you can call it anywhere in the code
232  *
233  * @param begin a forward iterator point at the start of the range
234  * @param n length of the range
235  */
236  template <MessageForwardIterC ForwardIt>
237  bool
238  trySend(ForwardIt begin, size_t n) {
239  if (hmbdc_likely(n)) {
240  auto bit = buffer_.tryClaim(n);
241  if (hmbdc_unlikely(!bit)) return false;
242  auto it = bit;
243  for (auto i = 0ul; i < n; i++) {
244  using Message = typename iterator_traits<ForwardIt>::value_type;
245  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
246  new (*it++) MessageWrap<Message>(*begin++);
247  }
248  buffer_.commit(bit, n);
249  }
250  return true;
251  }
252 
253  /**
254  * @brief send a message to the Context or attached ipc Contexts
255  * @details only the Clients that handles the Message will get it of course
256  * This function is threadsafe, which means you can call it anywhere in the code
257  *
258  * @param m message
259  * @tparam Message type
260  */
261  template <MessageC Message>
262  void send(Message&& m) {
263  using M = typename std::decay<Message>::type;
264  static_assert(std::is_trivially_destructible<M>::value, "cannot send message with dtor");
265  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
266  , "message too big");
267  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
268  HMBDC_THROW(std::out_of_range, "message too big");
269  }
270  buffer_.put(MessageWrap<M>(std::forward<Message>(m)));
271  }
272 
273  /**
274  * @brief try to send a message to the Context or attached ipc Contexts if it wouldn't block
275  * @details this call does not block - return false when buffer is full
276  * This function is threadsafe, which means you can call it anywhere in the code
277  *
278  * @param m message
279  * @tparam Message type
280  * @return true if send successfully
281  */
282  template <MessageC Message>
283  bool trySend(Message&& m) {
284  using M = typename std::decay<Message>::type;
285  static_assert(std::is_trivially_destructible<M>::value, "cannot send message with dtor");
286  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<M>) <= BUFFER_VALUE_SIZE
287  , "message too big");
288  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<M>) > buffer_.maxItemSize())) {
289  HMBDC_THROW(std::out_of_range, "message too big");
290  }
291  return buffer_.tryPut(MessageWrap<M>(std::forward<Message>(m)));
292  }
293 
294  /**
295  * @brief send a message to all Clients in the Context or attached ipc Contexts
296  * @details construct the Message in buffer directly
297  * This function is threadsafe, which means you can call it anywhere in the code
298  *
299  * @param args ctor args
300  * @tparam Message type
301  * @tparam typename ... Args args
302  */
303  template <MessageC Message, typename ... Args>
304  void sendInPlace(Args&&... args) {
305  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
306  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
307  , "message too big");
308  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
309  HMBDC_THROW(std::out_of_range, "message too big");
310  }
311  buffer_.template putInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
312  }
313 
314 
315  /**
316  * @brief try send a message to all Clients in the Context or attached ipc Contexts if it wouldn't block
317  * @details this call does not block - return false when buffer is full
318  * constructed the Message in buffer directly if returns true
319  * This function is threadsafe, which means you can call it anywhere in the code
320  *
321  * @param args ctor args
322  * @tparam Message type
323  * @tparam typename ... Args args
324  * @return true if send successfully
325  */
326  template <MessageC Message, typename ... Args>
327  bool trySendInPlace(Args&&... args) {
328  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
329  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
330  , "message too big");
331  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
332  HMBDC_THROW(std::out_of_range, "message too big");
333  }
334  return buffer_.template tryPutInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
335  }
336 
337  /**
338  * @brief accessor - mostly used internally
339  * @return underlying buffer used in the Context
340  */
341  Buffer& buffer() {
342  return buffer_;
343  }
344 
345 
346 protected:
347  ThreadCommBase(uint32_t messageQueueSizePower2Num
348  , size_t maxMessageSizeRuntime
349  , char const* shmName
350  , size_t offset)
351  : allocator_(shmName
352  , offset
353  , Buffer::footprint(maxMessageSizeRuntime + 8u
354  , messageQueueSizePower2Num)
355  , O_RDWR | O_SYNC | (cpa::create_ipc?O_CREAT:0)
356  )
357  , bufferptr_(allocator_.template allocate<Buffer>(SMP_CACHE_BYTES
358  , maxMessageSizeRuntime + 8u, messageQueueSizePower2Num
359  , allocator_)
360  )
361  , buffer_(*bufferptr_) {
362  if (messageQueueSizePower2Num < 2) {
363  HMBDC_THROW(std::out_of_range
364  , "messageQueueSizePower2Num need >= 2");
365  }
366  if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
367  HMBDC_THROW(std::out_of_range
368  , "can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
369  }
370  maxMessageSizeRuntime_ = maxMessageSizeRuntime;
371  primeBuffer<(cpa::create_ipc || (!cpa::create_ipc && !cpa::attach_ipc)) && cpa::has_pool>();
372  if (cpa::create_ipc || cpa::attach_ipc) {
373  sleep(2);
374  }
375  }
376 
377  ~ThreadCommBase() {
378  allocator_.unallocate(bufferptr_);
379  }
380 
381  static
382  void markDeadFrom(pattern::MonoLockFreeBuffer& buffer, uint16_t) {
383  // does not apply
384  }
385 
386  template <typename BroadCastBuf>
387  static
388  void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
389  for (uint16_t i = poolThreadCount;
390  i < BroadCastBuf::max_parallel_consumer;
391  ++i) {
392  buffer.markDead(i);
393  }
394  }
395 
396 
397  static
398  void markDead(pattern::MonoLockFreeBuffer& buffer, std::list<uint16_t>slots) {
399  // does not apply
400  }
401 
402  template <typename BroadCastBuf>
403  static
404  void markDead(BroadCastBuf& buffer, std::list<uint16_t>slots) {
405  for (auto s : slots) {
406  buffer.markDead(s);
407  }
408  }
409 
410  Allocator allocator_;
411  Buffer* HMBDC_RESTRICT bufferptr_;
412  Buffer& HMBDC_RESTRICT buffer_;
413 
414 private:
415  template <bool doIt>
416  typename std::enable_if<doIt, void>::type
417  primeBuffer() {
418  markDeadFrom(buffer_, 0);
419  }
420 
421  template <bool doIt>
422  typename std::enable_if<!doIt, void>::type
423  primeBuffer() {
424  }
425 
426  template <typename M, typename... Messages>
427  void sendRecursive(typename Buffer::iterator it
428  , M&& msg, Messages&&... msgs) {
429  using Message = typename std::decay<M>::type;
430  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
431  static_assert(MAX_MESSAGE_SIZE == 0 || sizeof(MessageWrap<Message>) <= BUFFER_VALUE_SIZE
432  , "message too big");
433  if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 && sizeof(MessageWrap<Message>) > buffer_.maxItemSize())) {
434  HMBDC_THROW(std::out_of_range, "message too big");
435  }
436  new (*it) MessageWrap<Message>(msg);
437  sendRecursive(++it, std::forward<M>(msgs)...);
438  }
439  void sendRecursive(typename Buffer::iterator) {}
440 
441  size_t maxMessageSizeRuntime_;
442 };
443 
444 } //context_detail
445 
446 /**
447  * @example hmbdc.cpp
448  * @example server-cluster.cpp
449  * a partition Context rightlyfully doesn't contain a thread pool and all its Clients
450  * are in direct mode. Pool related interfaces are turned off in compile time
451  */
452 
453 /**
454  * @class Context<>
455  * @brief A Context is like a media object that facilitates the communications
456  * for the Clients that it is holding.
457  * a Client can only be added to (or started within) once to a single Context,
458  * undefined behavior otherwise.
459  * the communication model is determined by the context_property
460  * by default it is in the nature of broadcast fashion within local process indicating
461  * by broadcast<>
462  *
463  * @details a broadcast Context contains a thread Pool powered by a number of OS threads.
464  * a Client running in such a Context can either run in the pool mode or a direct mode
465  * (which means the Client has its own dedicated OS thread)
466  * direct mode provides faster responses, and pool mode provides more flexibility.
467  * It is recommended that the total number of threads (pool threads + direct threads)
468  * not exceeding the number of available CPUs.
469  * @tparam MaxMessageSize What is the max message size if known
470  * at compile time(compile time sized);
471  * if the value can only be determined at runtime (run time sized), set this to 0.
472  * Things can still work but will lost some compile time checking advantages,
473  * see maxMessageSizeRuntime below
474  * @tparam ContextProperties see context_property namespace
475  */
476 template <size_t MaxMessageSize = 0, typename... ContextProperties>
477 struct Context
478 : context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...> {
479  using Base = context_detail::ThreadCommBase<MaxMessageSize, ContextProperties...>;
480  using Buffer = typename Base::Buffer;
481  using cpa = typename Base::cpa;
482  using Pool = typename std::conditional<cpa::pool_msgless
484  , pattern::PoolT<Buffer>>::type;
485  /**
486  * @brief ctor for construct local non-ipc Context
487  * @details won't compile if calling it for ipc Context
488  * @param messageQueueSizePower2Num value of 10 gives message queue if size of 1024 (messages, not bytes)
489  * @param maxPoolClientCount up to how many Clients the pool is suppose to support, only used when
490  * pool supported in the Context with broadcast property
491  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
492  * @param maxThreadSerialNumber the max number of threads (direct mmode clients plus pool threads)
493  * the context can manage
494  */
495  Context(uint32_t messageQueueSizePower2Num = MaxMessageSize?20:2
496  , size_t maxPoolClientCount = MaxMessageSize?128:0
497  , size_t maxMessageSizeRuntime = MaxMessageSize
498  , size_t maxThreadSerialNumber = 64)
499  : Base(messageQueueSizePower2Num < 2?2:messageQueueSizePower2Num
500  , maxMessageSizeRuntime, nullptr, 0)
501  , usedHmbdcCapacity_(0)
502  , stopped_(false)
503  , pool_(createPool<cpa>(maxPoolClientCount))
504  , poolThreadCount_(0) {
505  static_assert(!cpa::create_ipc && !cpa::attach_ipc
506  , "no name specified for ipc Context");
507  }
508 
509  /**
510  * @brief ctor for construct local ipc Context
511  * @details won't compile if calling it for local non-ipc Context
512  *
513  * @param ipcTransportName the id to identify an ipc transport that supports
514  * a group of attached together Contexts and their Clients
515  * @param messageQueueSizePower2Num value of 10 gives message queue if size of
516  * 1024 (messages, not bytes)
517  * @param maxPoolClientCount up to how many Clients the pool is suppose to support,
518  * only used when pool supported in the Context with broadcast property
519  * @param maxMessageSizeRuntime if MaxMessageSize set to 0, this value is used
520  * @param purgerCpuAffinityMask which CPUs to run the low profile (sleep mostly)
521  * thread in charge of purging crashed Clients. Used only for ipc_creator Contexts.
522  * @param maxThreadSerialNumber the max number of threads (direct mmode clients plus pool threads)
523  * @param ipcTransportDeviceOffset the offset in the ipcTransport dev for the use region
524  * the context can manage
525  */
526  Context(char const* ipcTransportName
527  , uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
528  , size_t maxPoolClientCount = MaxMessageSize?128:0
529  , size_t maxMessageSizeRuntime = MaxMessageSize
530  , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful
531  , size_t maxThreadSerialNumber = 64
532  , size_t ipcTransportDeviceOffset = 0)
533  : Base(messageQueueSizePower2Num, maxMessageSizeRuntime, ipcTransportName, ipcTransportDeviceOffset)
534  , usedHmbdcCapacity_(0)
535  , stopped_(false)
536  , pool_(createPool<cpa>(maxPoolClientCount))
537  , poolThreadCount_(0)
538  , secondsBetweenPurge_(60)
539  , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
540  static_assert(cpa::create_ipc || cpa::attach_ipc
541  , "ctor can only be used with ipc turned on Context");
542  static_assert(!(cpa::create_ipc && cpa::attach_ipc)
543  , "Context cannot be both ipc_creator and ipc_attacher");
544  }
545 
546  /**
547  * @brief dtor
548  * @details if this Context owns ipc transport, notify all attached processes
549  * that read from it that this tranport is dead
550  */
552  if (cpa::create_ipc) {
553  Base::markDeadFrom(this->buffer_, 0);
554  }
555  stop();
556  join();
557  }
558 
559  /**
560  * @brief add a client to Context's pool - the Client is run in pool mode
561  * @details if pool is already started, the client is to get current Messages immediatly
562  * - might miss older messages.
563  * if the pool not started yet, the Client does not get messages or other callbacks until
564  * the Pool starts.
565  * This function is threadsafe, which means you can call it anywhere in the code
566  * @tparam Client client type
567  * @param client to be added into the Pool
568  * @param poolThreadAffinityIn pool is powered by a number of threads
569  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
570  * it is possible to have a Client to use just some of the threads in the Pool
571  * - default to use all.
572  *
573  */
574  template <typename Client>
575  void addToPool(Client &client
576  , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
577  static_assert(cpa::has_pool, "pool is not support in the Context type");
578  static_assert(std::tuple_size<typename Client::Replies>::value == 0
579  , "cannot add Client that blocks into the pool");
580  if (std::is_base_of<single_thread_powered_client, Client>::value
581  && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
582  && poolThreadCount_ != 1) {
583  HMBDC_THROW(std::out_of_range
584  , "cannot add a single thread powered client to the non-single"
585  "thread powered pool without specifying a single thread poolThreadAffinity"
586  );
587  }
588  auto stub = new context_detail::PoolConsumerProxy<Client>(client);
589  pool_->addConsumer(*stub, poolThreadAffinityIn);
590 
591  }
592 
593  /**
594  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
595  * @details if pool is already started, the client is to get current Messages immediatly
596  * - might miss older messages.
597  * if the pool not started yet, the Client does not get messages or other callbacks until
598  * the Pool starts.
599  * This function is threadsafe, which means you can call it anywhere in the code
600  * @tparam Client client type
601  * @param client to be added into the Pool
602  * @param poolThreadAffinityIn pool is powered by a number of threads
603  * (thread in the pool is identified (by a number) in the mask starting from bit 0)
604  * it is possible to have a Client to use just some of the threads in the Pool
605  * - default to use all.
606  * @param args more client and poolThreadAffinityIn pairs can follow
607  */
608  template <typename Client, typename ... Args>
609  void addToPool(Client &client
610  , uint64_t poolThreadAffinityIn, Args&& ...args) {
611  addToPool(client, poolThreadAffinityIn);
612  addToPool(std::forward<Args>(args)...);
613  }
614 
615  /**
616  * @brief add a bunch of clients to Context's pool - the Clients are run in pool mode
617  * @details the implementatiotn tells all
618  * if the pool not started yet, the Client does not get messages or other callbacks until
619  * the Pool starts.
620  * This function is threadsafe, which means you can call it anywhere in the code
621  * @tparam Client client type
622  * @tparam Client2 client2 type
623  * @param client to be added into the Pool using default poolThreadAffinity
624  * @param client2 to be added into the Pool
625  * @param args more client (and/or poolThreadAffinityIn pairs can follow
626  */
627  template <typename Client, typename Client2, typename ... Args, typename Enabled
628  = typename std::enable_if<!std::is_integral<Client2>::value, void>::type>
629  void
630  addToPool(Client &client, Client2 &client2, Args&& ...args) {
631  addToPool(client);
632  addToPool(client2, std::forward<Args>(args)...);
633  }
634 
635  /**
636  * @brief return the numebr of clients added into pool
637  * @details the number could change since the clients could be added in another thread
638  * @return client count
639  */
640  size_t clientCountInPool() const {
641  static_assert(cpa::has_pool, "pool is not support in the Context type");
642  return pool_->consumerSize();
643  }
644 
645  /**
646  * @brief how many parallel consummers are started
647  * @details the dynamic value could change after the call returns
648  * see max_parallel_consumer Context property
649  * @return how many parallel consummers are started
650  */
651  size_t parallelConsumerAlive() const {
652  return this->buffer_.parallelConsumerAlive();
653  }
654  /**
655  * @brief start the context by specifying what are in it (Pool and/or direct Clients)
656  * and their paired up cpu affinities.
657  * @details All direct mode or clients in a pool started by a single start
658  * statement are dispatched with starting from the same event
659  * (subjected to event filtering of each client).
660  * many compile time and runtime check is done, for example:
661  * won't compile if start a pool in a Context does not support one;
662  * exception throw if the Context capacity is reached or try to start a second pool, etc.
663  *
664  * Usage example:
665  *
666  * @code
667  * // the following starts the pool powered by 3 threads that are affinitied to
668  * // the lower 8 CPUs; client0 affinitied to 4th CPU and client1 affinitied to 5th CPU
669  * ctx.start(3, 0xfful, client0, 0x8ul, client1, 0x10ul);
670  *
671  * // the following starts the pool powered by 3 threads that are affinitied to
672  * // all exisiting CPUs; client0 affinitied to a rotating CPU and
673  * // client1 affinitied to 5th CPU
674  * ctx.start(3, 0, client0, 0, client1, 0x10ul);
675  *
676  * // the following starts 2 direct mode Clients (client2 and client3)
677  * ctx.start(client2, 0x3ul, client3, 0xful);
678  * @endcode
679  *
680  * @tparam typename ...Args types
681  *
682  * @param args paired up args in the form of (pool-thread-count|client, cpuAffinity)*.
683  * see examples above.
684  * If a cpuAffinity is 0, each thread's affinity rotates to one of the CPUs in the system.
685  */
686  template <typename ...Args>
687  void
688  start(Args&& ... args) {
689  startWithContextProperty<cpa>(std::forward<Args>(args) ...);
690  }
691 
692  /**
693  * @brief stop the message dispatching - asynchronously
694  * @details asynchronously means not garanteed message dispatching
695  * stops immidiately after this non-blocking call
696  */
697  void
698  stop() {
699  stopWithContextProperty<cpa>();
700  }
701 
702  /**
703  * @brief wait until all threads (Pool threads too if apply) of the Context exit
704  * @details blocking call
705  */
706  void
707  join() {
708  joinWithContextProperty<cpa>();
709  }
710 
711  /**
712  * @brief ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...)
713  * Clients from the ipc transport to make the ipc trasnport healthy (avoiding buffer full).
714  * It periodically looks for things to purge. This is to set the period (default is 60 seconds).
715  * @details If some Client are known to
716  * take long to process messages, increase it. If you need to remove slow Clients quickly
717  * reduce it.
718  * Only effective for ipc_creator Context.
719  *
720  * @param s seconds
721  */
722  void
724  secondsBetweenPurge_ = s;
725  }
726 
727  /**
728  * @brief normally not used until you want to run your own message loop
729  * @details call this function frequently to pump hmbdc message loop in its pool
730  *
731  * @param threadSerialNumber starting from 0, indicate which thread in the pool
732  * is powering the loop
733  */
734  void
735  runPoolThreadOnce(uint16_t threadSerialNumberInPool) {
736  static_assert(cpa::has_pool, "pool is not support in the Context type");
737  pool_->runOnce(threadSerialNumberInPool);
738  }
739 
740  /**
741  * @brief normally not used until you want to run your own message loop
742  * @details call this function frequently to pump hmbdc message loop for a direct mode Client
743  *
744  * @param threadSerialNumber indicate which thread is powering the loop
745  * @param c the Client
746  */
747  template <typename Client>
748  void runClientThreadOnce(uint16_t threadSerialNumber, Client&& c) {
749  // c.messageDispatchingStarted(
750  // hmbdcNumbers_[threadSerialNumber]); //lower level ensures executing only once
751  uint16_t tn = cpa::broadcast_msg?hmbdcNumbers_[threadSerialNumber]:threadSerialNumber;
752 
753  context_detail::runOnceImpl(tn
754  , stopped_, this->buffer_
755  , c);
756  }
757 
758  /**
759  * @brief part of synchronous request reply interface. The caller is blocked until
760  * the replies to this request are heard and processed. Just like regular Messages
761  * REQUEST is sent out to Clients that are interested in this Context. Since the
762  * caller thread is blocked, the user is to ensure the Context is not dealoacked due
763  * to the blocking
764  *
765  * @tparam Request Message the type wrapped in REQUEST<> template
766  * @tparam typename ReplyHandler type for how to handle the replies.
767  * example of a handler expect two steps replies for requestReply call:
768  * @code
769  * struct ClientAlsoHandlesReplies
770  * : Client<ClientAlsoHandlesReplies, REPLY<Reply0>, REPLY<Reply1>, OtherMsg> {
771  * void handleReplyCb(Reply0 const&){
772  * // got step 1 reply
773  * }
774  * void handleReplyCb(Reply1 const&){
775  * // got step 2 reply
776  * throw hmbdc::ExitCode(0); // all replies received, now
777  * // throw to unblock the requestReply call
778  * }
779  * @endcode
780  *
781  * @param request the REQUEST sent out
782  * @param replyHandler the callback logic to handle replies
783  * @param timeout timeout value
784  * @return true if replies are received and handled, false if timedout
785  */
786  template <RequestC Request, typename ReplyHandler>
787  bool
788  requestReply(Request&& request, ReplyHandler& replyHandler
789  , time::Duration timeout = time::Duration::seconds(0xffffffff)) {
790  static_assert(cpa::has_pool && !cpa::pool_msgless
791  , "not supported - needs to have a pool");
792  return hmbdc::app::requestReply(std::forward<Request>(request)
793  , *this
794  , *this
795  , replyHandler
796  , timeout);
797  }
798 
799  /**
800  * @brief part of synchronous request reply interface. Send a constructed REPLY
801  * to the Context, it will only reach the original sender Client's reply callbacks.
802  * Although the sender Client is blocked - their reply callbacks are invoked using
803  * a different thread.
804  * ALL REPLY need to sent out using this interface to avoid
805  * undefined behavior.
806  *
807  * @tparam Reply Message the type wrapped in REPLY<> template
808  * @param reply the REPLY (previously constructed from a REQUEST) to sent out
809  */
810  template <ReplyC Reply>
811  void
812  sendReply(Reply&& reply) {
813  static_assert(cpa::has_pool && !cpa::pool_msgless
814  , "not supported - needs to have a pool");
815  this->send(std::forward<Reply>(reply));
816  }
817 
818 private:
819  template <typename cpa>
820  typename std::enable_if<cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
821  createPool(size_t maxPoolClientCount) {
822  return Pool::create(this->buffer(), maxPoolClientCount);
823  }
824 
825  template <typename cpa>
826  typename std::enable_if<cpa::pool_msgless, typename Pool::ptr>::type
827  createPool(size_t maxPoolClientCount) {
828  return Pool::create(maxPoolClientCount);
829  }
830 
831  template <typename cpa>
832  typename std::enable_if<!cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
833  createPool(size_t) {
834  return typename Pool::ptr();
835  }
836 
837  template <typename cpa>
838  typename std::enable_if<cpa::has_pool, void>::type
839  stopWithContextProperty() {
840  if (pool_) pool_->stop();
841  __atomic_thread_fence(__ATOMIC_ACQUIRE);
842  stopped_ = true;
843  }
844 
845  template <typename cpa>
846  typename std::enable_if<!cpa::has_pool, void>::type
847  stopWithContextProperty() {
848  __atomic_thread_fence(__ATOMIC_ACQUIRE);
849  stopped_ = true;
850  }
851 
852  template <typename cpa>
853  typename std::enable_if<cpa::has_pool, void>::type
854  joinWithContextProperty() {
855  if (pool_) pool_->join();
856  for (auto& t : threads_) {
857  t.join();
858  }
859  threads_.clear();
860  }
861 
862  template <typename cpa>
863  typename std::enable_if<!cpa::has_pool, void>::type
864  joinWithContextProperty() {
865  for (auto& t : threads_) {
866  t.join();
867  }
868  threads_.clear();
869  }
870 
871  template <typename cpa>
872  void
873  reserveSlots(std::list<uint16_t>&) {
874  }
875 
876  template <typename cpa, typename ...Args>
877  typename std::enable_if<cpa::broadcast_msg && !cpa::pool_msgless, void>::type
878  reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
879  auto available = this->buffer_.unusedConsumerIndexes();
880  if (available.size() < poolThreadCount) {
881  HMBDC_THROW(std::out_of_range
882  , "Context remaining capacilty = " << available.size()
883  << ", consider increasing max_parallel_consumer");
884  }
885  for (uint16_t i = 0; i < poolThreadCount; ++i) {
886  slots.push_back(available[i]);
887  this->buffer_.reset(available[i]);
888  }
889  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
890  }
891 
892  template <typename cpa, typename ...Args>
893  typename std::enable_if<!cpa::broadcast_msg || cpa::pool_msgless, void>::type
894  reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
895  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
896  }
897 
898  template <typename cpa, typename CcClient, typename ...Args>
899  typename std::enable_if<cpa::broadcast_msg && !std::is_integral<CcClient>::value, void>::type
900  reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
901  const bool clientParticipateInMessaging =
902  std::decay<CcClient>::type::INTERESTS_SIZE != 0;
903  if (clientParticipateInMessaging) {
904  auto available = this->buffer_.unusedConsumerIndexes();
905  if (!available.size()) {
906  HMBDC_THROW(std::out_of_range
907  , "Context reached capacity, consider increasing max_parallel_consumer");
908  }
909  this->buffer_.reset(available[0]);
910  slots.push_back(available[0]);
911  }
912  reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
913  }
914 
915  template <typename cpa, typename CcClient, typename ...Args>
916  typename std::enable_if<!cpa::broadcast_msg && !std::is_integral<CcClient>::value, void>::type
917  reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
918  }
919 
920  template <typename cpa, typename ...Args>
921  typename std::enable_if<cpa::create_ipc || cpa::attach_ipc, void>::type
922  startWithContextProperty(Args&& ... args) {
923  auto& lock = this->allocator_.fileLock();
924  std::lock_guard<decltype(lock)> g(lock);
925  std::list<uint16_t> slots;
926  try {
927  reserveSlots<cpa>(slots, args ...);
928  auto sc = slots;
929  startWithContextPropertyImpl<cpa>(sc, std::forward<Args>(args) ...);
930  } catch (std::out_of_range const&) {
931  Base::markDead(this->buffer_, slots);
932  throw;
933  }
934  }
935 
936  template <typename cpa, typename ...Args>
937  typename std::enable_if<!cpa::create_ipc && !cpa::attach_ipc, void>::type
938  startWithContextProperty(Args&& ... args) {
939  std::list<uint16_t> slots;
940  try {
941  reserveSlots<cpa>(slots, args ...);
942  auto sc = slots;
943  startWithContextPropertyImpl<cpa>(sc, std::forward<Args>(args) ...);
944  } catch (std::out_of_range const&) {
945  Base::markDead(this->buffer_, slots);
946  throw;
947  }
948  }
949 
950  template <typename cpa>
951  typename std::enable_if<cpa::broadcast_msg && cpa::create_ipc, void>::type
952  startWithContextPropertyImpl(std::list<uint16_t>& slots) {
953  if (!purger_) {
954  purger_.reset(
955  new StuckClientPurger<Buffer>(secondsBetweenPurge_, this->buffer_));
956  startWithContextPropertyImpl<cpa>(slots, *purger_, purgerCpuAffinityMask_);
957  }
958  }
959 
960  template <typename cpa>
961  typename std::enable_if<!cpa::broadcast_msg || !cpa::create_ipc, void>::type
962  startWithContextPropertyImpl(std::list<uint16_t>& slots) {
963  }
964 
965  template <typename cpa, typename ...Args>
966  typename std::enable_if<cpa::has_pool, void>::type
967  startWithContextPropertyImpl(std::list<uint16_t>& slots
968  , uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
969  , Args&& ... args) {
970  using namespace std;
971  if (poolThreadCount_) {
972  HMBDC_THROW(std::out_of_range, "Context pool already started");
973  }
974  std::vector<uint16_t> sc(slots.begin(), slots.end());
975  if (!poolThreadsCpuAffinityMask) {
976  auto cpuCount = std::thread::hardware_concurrency();
977  poolThreadsCpuAffinityMask =
978  ((1ul << poolThreadCount) - 1u) << (hmbdcNumbers_.size() % cpuCount);
979  }
980 
981  pool_->startAt(poolThreadCount, poolThreadsCpuAffinityMask, sc);
982  while(poolThreadCount--) {
983  if (!cpa::pool_msgless) {
984  hmbdcNumbers_.push_back(*slots.begin());
985  slots.pop_front();
986  }
987  }
988  poolThreadCount_ = poolThreadCount;
989  startWithContextPropertyImpl<cpa>(slots, std::forward<Args>(args) ...);
990  }
991 
992  template <typename cpa, typename Client, typename ...Args>
993  typename std::enable_if<!std::is_integral<Client>::value, void>::type
994  startWithContextPropertyImpl(std::list<uint16_t>& slots
995  , Client& c, uint64_t cpuAffinity
996  , Args&& ... args) {
997  auto clientParticipateInMessaging =
998  std::decay<Client>::type::INTERESTS_SIZE;
999  uint16_t hmbdcNumber = 0xffffu;
1000  if (clientParticipateInMessaging && cpa::broadcast_msg) {
1001  hmbdcNumber = *slots.begin();
1002  slots.pop_front();
1003  }
1004  auto thrd = kickOffClientThread(
1005  c, cpuAffinity, hmbdcNumber, hmbdcNumbers_.size());
1006  threads_.push_back(move(thrd));
1007  hmbdcNumbers_.push_back(hmbdcNumber);
1008  startWithContextPropertyImpl<cpa>(slots, std::forward<Args>(args) ...);
1009  }
1010 
1011  template <typename Client>
1012  auto kickOffClientThread(
1013  Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
1014  std::thread thrd([
1015  this
1016  , &c
1017  , mask
1018  , h=hmbdcNumber
1019  , threadSerialNumber
1020  ]() {
1021  auto hmbdcNumber = h;
1022  std::string name;
1023  char const* schedule;
1024  int priority;
1025  auto clientParticipateInMessaging =
1026  std::decay<Client>::type::INTERESTS_SIZE;
1027 
1028 
1029  if (c.hmbdcName()) {
1030  name = c.hmbdcName();
1031  } else {
1032  if (clientParticipateInMessaging) {
1033  name = "hmbdc" + std::to_string(hmbdcNumber);
1034  } else {
1035  name = "hmbdc-x";
1036  }
1037  }
1038  try {
1039  auto cpuAffinityMask = mask;
1040  std::tie(schedule, priority) = c.schedSpec();
1041 
1042  if (!schedule) schedule = "SCHED_OTHER";
1043 
1044  if (!mask) {
1045  auto cpuCount = std::thread::hardware_concurrency();
1046  cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
1047  }
1048 
1049  hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
1050  , schedule, priority);
1051 
1052  hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
1053  c.messageDispatchingStartedCb(hmbdcNumber);
1054 
1055  } catch (std::exception const& e) {
1056  c.stopped(e);
1057  return;
1058  } catch (int code) {
1059  c.stopped(ExitCode(code));
1060  return;
1061  } catch (...) {
1063  return;
1064  }
1065 
1066  while(!stopped_ &&
1067  context_detail::runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
1068  }
1069  if (this->stopped_) c.dropped();
1070  if (clientParticipateInMessaging) context_detail::unblock(this->buffer_, hmbdcNumber);
1071  }
1072  );
1073 
1074  return thrd;
1075  }
1076 
1077  Context(Context const&) = delete;
1078  Context& operator = (Context const&) = delete;
1079  uint16_t usedHmbdcCapacity_;
1080  std::vector<uint16_t> hmbdcNumbers_;
1081 
1082  bool stopped_;
1083  typename Pool::ptr pool_;
1084  using Threads = std::vector<std::thread>;
1085  Threads threads_;
1086  size_t poolThreadCount_;
1087  uint32_t secondsBetweenPurge_;
1088  uint64_t purgerCpuAffinityMask_;
1089  typename std::conditional<cpa::broadcast_msg && cpa::create_ipc
1090  , std::unique_ptr<StuckClientPurger<Buffer>>, uint32_t
1091  >::type purger_;
1092 };
1093 
1094 }}
1095 
Definition: MonoLockFreeBuffer.hpp:15
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:75
Definition: StuckClientPurger.hpp:11
bool requestReply(RequestT &&request, Sender &sender, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout)
not an interface exposed to end user
Definition: RequestReply.hpp:142
Context template parameter indicating the Context must contain a pool to run Clients and the Clients ...
Definition: Context.hpp:88
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:698
covers the inter-thread and ipc communication fascade
Definition: Context.hpp:146
void addToPool(Client &client, Client2 &client2, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:630
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:707
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:77
Definition: Topic.hpp:44
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:85
Definition: PoolMinus.hpp:9
bool requestReply(Request &&request, ReplyHandler &replyHandler, time::Duration timeout=time::Duration::seconds(0xffffffff))
part of synchronous request reply interface. The caller is blocked until the replies to this request ...
Definition: Context.hpp:788
the default vanilla allocate
Definition: Allocators.hpp:145
void send(Message &&m)
send a message to the Context or attached ipc Contexts
Definition: Context.hpp:262
Unknown excpetion.
Definition: Exception.hpp:17
void runClientThreadOnce(uint16_t threadSerialNumber, Client &&c)
normally not used until you want to run your own message loop
Definition: Context.hpp:748
Definition: BlockingBuffer.hpp:10
void sendReply(Reply &&reply)
part of synchronous request reply interface. Send a constructed REPLY to the Context, it will only reach the original sender Client&#39;s reply callbacks. Although the sender Client is blocked - their reply callbacks are invoked using a different thread. ALL REPLY need to sent out using this interface to avoid undefined behavior.
Definition: Context.hpp:812
void stopped(std::exception const &e) noexcept
the following are for internal use, don&#39;t change or override
Definition: Client.hpp:182
void runPoolThreadOnce(uint16_t threadSerialNumberInPool)
normally not used until you want to run your own message loop
Definition: Context.hpp:735
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:238
Definition: ContextDetail.hpp:27
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:215
when processes are distributed on a PCIe board and host PC, add this property
Definition: Context.hpp:124
Context(char const *ipcTransportName, uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, uint64_t purgerCpuAffinityMask=0xfffffffffffffffful, size_t maxThreadSerialNumber=64, size_t ipcTransportDeviceOffset=0)
ctor for construct local ipc Context
Definition: Context.hpp:526
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:57
bool trySend(M0 &&m0, M1 &&m1, Messages &&... msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:193
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context&#39;s pool - the Client is run in pool mode
Definition: Context.hpp:575
Definition: LockFreeBufferT.hpp:18
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:651
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:640
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:477
~Context()
dtor
Definition: Context.hpp:551
Definition: Message.hpp:112
Definition: Time.hpp:133
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:113
Definition: BitMath.hpp:9
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:341
Context template parameter indicating the Context is ipc enabled and it can be attached (see ipc_atta...
Definition: Context.hpp:103
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context&#39;s pool - the Clients are run in pool mode
Definition: Context.hpp:609
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
void send(M0 &&m0, M1 &&m1, Messages &&... msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:173
Exception that just has an exit code.
Definition: Exception.hpp:28
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:723
Context(uint32_t messageQueueSizePower2Num=MaxMessageSize?20:2, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, size_t maxThreadSerialNumber=64)
ctor for construct local non-ipc Context
Definition: Context.hpp:495
void start(Args &&... args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:688
bool trySendInPlace(Args &&... args)
try send a message to all Clients in the Context or attached ipc Contexts if it wouldn&#39;t block ...
Definition: Context.hpp:327
void sendInPlace(Args &&... args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:304
bool trySend(Message &&m)
try to send a message to the Context or attached ipc Contexts if it wouldn&#39;t block ...
Definition: Context.hpp:283
Context template parameter indicating the Context is ipc enabled and it can attach to an ipc transpor...
Definition: Context.hpp:116
Definition: Base.hpp:13
Definition: PoolT.hpp:11