hmbdc
simplify-high-performance-messaging-programming
Domain.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/DefaultUserConfig.hpp"
4 #include "hmbdc/tips/Messages.hpp"
5 #include "hmbdc/tips/TypeTagSet.hpp"
6 #include "hmbdc/tips/Node.hpp"
7 #include "hmbdc/app/BlockingContext.hpp"
8 #include "hmbdc/app/Context.hpp"
9 #include "hmbdc/app/Config.hpp"
10 #include "hmbdc/Exception.hpp"
11 #include "hmbdc/pattern/GuardedSingleton.hpp"
12 #include "hmbdc/MetaUtils.hpp"
13 
14 #include <deque>
15 #include <optional>
16 #include <type_traits>
17 #include <unistd.h>
18 #include <stdlib.h>
19 
20 namespace hmbdc { namespace tips {
21 
22 /**
23  * @brief template that Domain uses for the network communication properties
24  *
25  * @tparam Protocol the TIPS network protocol in use - tcpcast, rmcast etc
26  * @tparam MaxMessageSize the compile time specified max size of the network transferred
27  * message, this does not include the attachment size, it could be simply set to be
28  * something like hmbdc::max_size_in_tuple<AllSendMessagesTuple>
29  * If set to be 0, the value becomes runtime configured.
30  */
31 template <typename Protocol, size_t MaxMessageSize = 1000>
32 struct net_property {
33  enum {
34  max_message_size = MaxMessageSize
35  };
36  using protocol = Protocol;
37 };
38 
39 /**
40  * @brief Placeholder for the Protocol within net_property
41  * that turns off network communication at compile time
42  *
43  */
44 struct NoProtocol
45 : pattern::GuardedSingleton<NoProtocol> {
46  using SendTransportEngine = void*;
47  template <typename Buffer, typename AttachmentAllocator>
48  using RecvTransportEngine = void*;
49 
50  /**
51  * @brief construct the host-wide unique TIPS domain name
52  * from a configure file
53  *
54  * @param cfg the configure file that contains network specifics
55  * @return std::string
56  */
57  std::string getTipsDomainName(app::Config const& cfg) {
58  return cfg.getExt<std::string>("tipsDomainNonNet");
59  }
60 
61  private:
63  NoProtocol(){}
64 };
65 
66 /**
67  * @brief net_property that turns off network communication at compile time
68  *
69  */
70 using NoNet = net_property<NoProtocol, 0>;
71 
72 /**
73  * @brief template that Domain uses for the IPC communication properties
74  *
75  * @tparam IpcCapacity power of 2 value startign from 4, (8, 16 ... 256)
76  * It specify up to how many IPC parties (processes) are communicating
77  * @tparam MaxMessageSize the compile time specified max size of the IPC transferred
78  * message, this does not include the attachment size, it could be simply set to be
79  * something like hmbdc::max_size_in_tuple<AllSendMessagesTuple>
80  * If set to be 0, the value becomes runtime configured.
81  */
82 template <uint16_t IpcCapacity = 64, size_t MaxMessageSize = 1000>
83 struct ipc_property {
84  enum {
85  capacity = IpcCapacity,
86  max_message_size = MaxMessageSize
87  };
88 };
89 
90 /**
91  * @brief ipc_property that turns off IPC communication at compile time
92  *
93  */
95 
96 namespace domain_detail {
97 // HMBDC_CLASS_HAS_DECLARE(upgradeToHasSharedPtrAttachment);
98 template <MessageC Message, bool canSerialize = has_toHmbdcSerialized<Message>::value>
100  using type = std::result_of_t<decltype(&Message::toHmbdcSerialized)(Message)>;
101 };
102 
103 template <MessageC Message>
104 struct matching_ipcable<Message, false> {
105  using type = Message;
106 };
107 
108 template <MessageTupleC MessageTuple>
110 
111 template <>
112 struct recv_ipc_converted<std::tuple<>> {
113  using type = std::tuple<>;
114 };
115 
116 template <MessageC Message>
117 struct is_ipcable {
118  enum {
119  value = (std::is_trivially_destructible<Message>::value
120  || has_toHmbdcSerialized<Message>::value) ? 1:0,
121  };
122 };
123 
124 template <MessageC Message, MessageC ...Messages>
125 struct recv_ipc_converted<std::tuple<Message, Messages...>> {
126  using next = typename recv_ipc_converted<std::tuple<Messages...>>::type;
127  using type = typename std::conditional<is_ipcable<Message>::value
129  , next
130  >::type;
131 };
132 
133 template <MessageC Message>
134 struct ipc_from : Message {
135  ipc_from(pid_t from, Message const& m)
136  : Message(m)
137  , hmbdcIpcFrom(from) {}
138  pid_t hmbdcIpcFrom;
139 };
140 
141 } //domain_detail
142 
143 /**
144  * @brief when Domain (it's pumps) receives a hasMemoryAttachment, there is a need
145  * to allocate desired memory to hold the attachment bytes and release it after consumed.
146  * This is the policy type dictating how that is done by default - using malloc/free
147  */
149  /**
150  * @brief fill in hasMemoryAttachment so it holds the desired
151  * memory and the incoming attachment can be holden
152  *
153  * @param typeTag the hasMemoryAttachment message type tag
154  * @param att the hasMemoryAttachment struct to be filled in
155  * @return the allocated memory
156  */
157  void* operator()(uint16_t typeTag, app::hasMemoryAttachment* att) {
158  att->attachment = ::malloc(att->len);
159  att->afterConsumedCleanupFunc = [](app::hasMemoryAttachment* hasAtt) {
160  ::free(hasAtt->attachment);
161  hasAtt->attachment = nullptr;
162  };
163  return att->attachment;
164  }
165 };
166 
167 /**
168  * @brief Messages published on a TIPS pub/sub domain reach all the Nodes
169  * within that domain based on their subscriptions.
170  * This class represents a TIPS domain's handle / interface / fascade in its process.
171  * Typically - by recommendation - there is just a single Domain object for a specific TIPS domain
172  * within a process.
173  *
174  * @details A Domain object also manages a group of Nodes on their communication
175  * across the TIPS domain that this Domain object maps to.
176  * The TIPS domain is determined by the network configurations
177  * such as which NIC interface and multicast addresses configured.
178  * see getTipsDomainName for network transport protocol
179  *
180  * A Node can only be managed (be started) within a single Domain.
181  * A Domain instance can only be configured to map to a single TIPS domain
182  * Multiple Domain objects could be mapped to a single TIPS domain - in one (not recommended) or
183  * multiple processes on the same or different networked hosts.
184  *
185  * @tparam RecvMessageTupleIn a std tuple that lists all the receiving message types
186  * Any inbound messages (to the Domain) that are not within this list are dropped without
187  * being deliverred to the Nodes in this Domain
188  * @tparam IpcProp IPC preoperty - see ipc_property template
189  * @tparam NetProp Network communication properties - see net_property template
190  * @tparam NodeContext the template that manages the nodes and accepts the inter thread
191  * messages
192  * @tparam AttachmentAllocator the memory allocation policy for attachment - see DefaultAttachmentAllocator
193  */
194 template <MessageTupleC RecvMessageTupleIn
195  , typename IpcProp = NoIpc
196  , typename NetProp = NoNet
197  , template <class...> class NodeContext = app::BlockingContext
198  , typename AttachmentAllocator = DefaultAttachmentAllocator >
199 struct Domain {
200  using IpcProperty = IpcProp;
201  using NetProperty = NetProp;
202  using NetProtocol = typename NetProperty::protocol;
203 private:
204  using RecvMessageTuple = typename hmbdc::remove_duplicate<RecvMessageTupleIn>::type;
205  using ThreadCtx = NodeContext<RecvMessageTuple>;
206 
207  ThreadCtx threadCtx_;
208 
209  using IpcableRecvMessages = typename domain_detail::recv_ipc_converted<RecvMessageTuple>::type;
210  using NetableRecvMessages = IpcableRecvMessages;
211 
212  enum {
213  run_pump_in_ipc_portal = IpcProperty::capacity != 0,
214  run_pump_in_thread_ctx = run_pump_in_ipc_portal
215  ? 0 : !std::is_same<NoNet, NetProperty>::value,
216  has_a_pump = run_pump_in_ipc_portal || run_pump_in_thread_ctx,
217  has_net_send_eng = !std::is_same<NoNet, NetProperty>::value,
218  has_net_recv_eng = !std::is_same<NoNet, NetProperty>::value
219  && std::tuple_size<NetableRecvMessages>::value,
220  };
221 
222  template <MessageC Message> using ipc_from = domain_detail::ipc_from<Message>;
223 
224  using IpcSubscribeMessagesPossible = IpcableRecvMessages;
225  using IpcTransport = typename std::conditional<
226  IpcProperty::capacity != 0
227  , app::Context<
228  (IpcProperty::max_message_size != 0
229  ? IpcProperty::max_message_size
230  : 0)
233  >
234  , void*
235  >::type;
236 
237  std::optional<IpcTransport> ipcTransport_;
238  struct OneBuffer {
239  ThreadCtx& threadCtx;
240  size_t dispCount = 0;
241  size_t maxItemSize_;
242 
243  OneBuffer(ThreadCtx& threadCtx, size_t maxItemSizeIn)
244  : threadCtx(threadCtx)
245  , maxItemSize_(maxItemSizeIn) {
246  }
247  size_t maxItemSize() const {
248  return maxItemSize_ + sizeof(app::MessageHead);
249  }
250 
251  template <MessageC Message>
252  void handleMessageCb(Message& msg) {
253  using hasMemoryAttachment = app::hasMemoryAttachment;
254  if constexpr (has_toHmbdcUnserialized<Message>::value) {
255  static_assert(std::is_same<Message
256  , decltype(msg.toHmbdcUnserialized().toHmbdcSerialized())>::value
257  , "mising or wrong toHmbdcSerialized() func - cannot serialize");
258  threadCtx.send(msg.toHmbdcUnserialized());
259  } else {
260  threadCtx.send(msg);
261  }
262  if constexpr(std::is_base_of<hasMemoryAttachment, Message>::value) {
263  /// do not let MD release the data
264  msg.hasMemoryAttachment::attachment = nullptr;
265  msg.hasMemoryAttachment::len = 0;
266  }
267  }
268 
269  void handleJustBytesCb(uint16_t tag, void const* bytes, app::hasMemoryAttachment* att) {
270  threadCtx.sendJustBytesInPlace(tag, bytes, maxItemSize_, att);
271  if (att) {
272  /// do not let MD release the data
273  att->attachment = nullptr;
274  att->len = 0;
275  }
276  }
277 
278  void put(void* item, size_t) {
279  using NoAttRecv = typename filter_out_tuple_by_base<app::hasMemoryAttachment
280  , NetableRecvMessages>::type;
282  auto& h = *(app::MessageHead*)(item);
283  h.scratchpad().desc.flag = 0;
284  if (disp(*this, h)) {
285  dispCount++;
286  }
287  }
288 
290  using AttRecv = typename filter_in_tuple_by_base<app::hasMemoryAttachment
291  , NetableRecvMessages>::type;
293  item->scratchpad().desc.flag = app::hasMemoryAttachment::flag;
294  if (disp(*this, *item)) {
295  dispCount++;
296  } else {
297  /// MD did not release the data, do it here
298  item->payload.release();
299  }
300  }
301 
302  template <typename T> void put(T& item) {put(&item, sizeof(T));}
303  template <typename T> void putSome(T& item) {
304  put(&item, std::min(sizeof(item), maxItemSize()));
305  }
306  };
307 
309  : public app::client_using_tuple<PumpInThreadCtx, std::tuple<>>::type {
310  std::optional<typename NetProtocol::SendTransportEngine> sendEng_;
311  using RecvTransportEngine
312  = typename NetProtocol::template RecvTransportEngine<OneBuffer, AttachmentAllocator>;
313  std::optional<RecvTransportEngine> recvEng_;
314  ThreadCtx& outCtx_;
315  OneBuffer netBuffer_;
316  std::string hmbdcName_;
317 
318  public:
319  typename ThreadCtx::ClientRegisterHandle handleInCtx;
320 
321  PumpInThreadCtx(ThreadCtx& outCtx, app::Config const& cfg)
322  : outCtx_(outCtx)
323  , netBuffer_{outCtx
324  , NetProperty::max_message_size != 0
325  ? NetProperty::max_message_size
326  : cfg.getExt<uint32_t>("netMaxMessageSizeRuntime")
327  }
328  , hmbdcName_(cfg.getExt<std::string>("pumpHmbdcName")) {
329  if constexpr (has_net_send_eng) {
330  uint32_t limit = NetProperty::max_message_size;
331  if (limit == 0) {
332  limit = cfg.getExt<uint32_t>("netMaxMessageSizeRuntime");
333  }
334  sendEng_.emplace(cfg, limit);
335  }
336  if constexpr (has_net_recv_eng) {
337  recvEng_.emplace(cfg, netBuffer_);
338  }
339  }
340 
341  template <typename CcNode>
342  void subscribeFor(CcNode const& node, uint16_t mod, uint16_t res) {
343  if constexpr (has_net_recv_eng) {
344  using Messages = typename filter_in_tuple<domain_detail::is_ipcable
345  , typename CcNode::RecvMessageTuple>::type;
346  recvEng_->template subscribeFor<Messages>(node, mod, res);
347  }
348  }
349 
350  size_t ipcSubscribingPartyCount(uint16_t tag) const {
351  return 0;
352  }
353 
354  size_t netSubscribingPartyCount(uint16_t tag) const {
355  auto res = size_t{0};
356  if constexpr (has_net_send_eng) {
357  res += sendEng_->subscribingPartyDetectedCount(tag);
358  }
359  return res;
360  }
361 
362  template <typename CcNode>
363  void advertiseFor(CcNode const& node, uint16_t mod, uint16_t res) {
364  if constexpr (has_net_send_eng) {
365  using Messages = typename filter_in_tuple<domain_detail::is_ipcable
366  , typename CcNode::SendMessageTuple>::type;
367  sendEng_->template advertiseFor<Messages>(node, mod, res);
368  }
369  }
370 
371  size_t netSendingPartyDetectedCount() const {
372  if constexpr (has_net_recv_eng) {
373  return recvEng_->sessionsRemainingActive();
374  }
375  return 0;
376  }
377 
378  size_t netRecvingPartyDetectedCount() const {
379  if constexpr (has_net_send_eng) {
380  return sendEng_->sessionsRemainingActive();
381  }
382  return 0;
383  }
384 
385  template <MessageC Message>
386  void send(Message&& message) {
387  using M = typename std::decay<Message>::type;
388  using Mnet = typename domain_detail::matching_ipcable<M>::type;
389 
390  if constexpr (std::is_trivially_destructible<Mnet>::value) {
391  if constexpr (has_tipsDisableSendMask<M>::value) {
392  if (M::tipsDisableSendMask() & OVER_NETWORK) return;
393  }
394  if constexpr (has_net_send_eng) {
395  if constexpr(has_toHmbdcSerialized<M>::value) {
396  static_assert(std::is_same<M
397  , decltype(message.toHmbdcSerialized().toHmbdcUnserialized())>::value
398  , "mising or wrong toHmbdcUnserialized() func - cannot convertback");
399  static_assert(NetProperty::max_message_size == 0
400  || sizeof(decltype(message.toHmbdcSerialized())) <= NetProperty::max_message_size
401  , "NetProperty::max_message_size is too small");
402  sendEng_->queue(message.toHmbdcSerialized());
403  } else {
404  static_assert(NetProperty::max_message_size == 0
405  || sizeof(message) <= NetProperty::max_message_size
406  , "NetProperty::max_message_size is too small");
407  sendEng_->queue(message);
408  }
409  }
410  }
411  }
412 
413  void sendJustBytes(uint16_t tag, void const* bytes, size_t len
414  , app::hasMemoryAttachment* att) {
415  if constexpr (has_net_send_eng) {
416  sendEng_->queueJustBytes(tag, bytes, len, att);
417  }
418  }
419 
420  char const* hmbdcName() const {
421  return hmbdcName_.c_str();
422  }
423 
424  void invokedCb(size_t previousBatch) {
425  bool layback = !previousBatch;
426  if constexpr (has_net_send_eng) {
427  layback = layback && !sendEng_->bufferedMessageCount();
428  }
429  if (layback) {
430  std::this_thread::yield();
431  }
432 
433  if constexpr (has_net_recv_eng) {
434  netBuffer_.dispCount = 0;
435  recvEng_->rotate();
436  }
437  if constexpr (has_net_send_eng) {
438  sendEng_->rotate();
439  }
440  }
441 
442  void stop() {
443  if constexpr (has_net_send_eng) {
444  sendEng_->stop();
445  }
446  }
447  };
448 
449  template <size_t MAX_MEMORY_ATTACHMENT>
452  : att(new(underlyingMessage) app::hasMemoryAttachment){}
453  uint8_t underlyingMessage[MAX_MEMORY_ATTACHMENT];
454  AttachmentAllocator attAllocator;
455 
456  template <MessageC Message>
457  bool cache(app::InBandHasMemoryAttachment<Message> const& ibma, uint16_t typeTagIn) {
458  static_assert(sizeof(Message) <= MAX_MEMORY_ATTACHMENT, "");
459  if (hmbdc_unlikely(att->attachment)) {
460  HMBDC_THROW(std::logic_error, "previous InBandMemoryAttachment not concluded");
461  }
462  typeTag = typeTagIn;
463  if constexpr(Message::justBytes) {
464  memcpy(underlyingMessage, &ibma.underlyingMessage, sizeof(underlyingMessage));
465  } else {
466  memcpy(underlyingMessage, &ibma.underlyingMessage, sizeof(Message));
467  }
468 
469  if (att->holdShmHandle<Message>()) {
470  if constexpr (app::has_hmbdcShmRefCount<Message>::value) {
471  auto shmAddr = hmbdcShmHandleToAddr(att->shmHandle);
472  att->attachment = shmAddr;
473  att->clientData[0] = (uint64_t)&hmbdcShmDeallocator;
474  static_assert(sizeof(ibma.underlyingMessage.hmbdcShmRefCount) == sizeof(size_t));
475  att->clientData[1] = (uint64_t)&ibma.underlyingMessage.hmbdcShmRefCount;
476  if constexpr (Message::is_att_0cpyshm) {
477  att->afterConsumedCleanupFunc = [](app::hasMemoryAttachment* h) {
478  auto hmbdcShmRefCount = (size_t*)h->attachment;
479  if (h->attachment
480  && 0 == __atomic_sub_fetch(hmbdcShmRefCount, 1, __ATOMIC_RELEASE)) {
481  auto& hmbdcShmDeallocator
482  = *(std::function<void (uint8_t*)>*)h->clientData[0];
483  hmbdcShmDeallocator((uint8_t*)h->attachment);
484  }
485  };
486  } else if constexpr (app::has_hmbdcShmRefCount<Message>::value) {
487  att->afterConsumedCleanupFunc = [](app::hasMemoryAttachment* h) {
488  auto hmbdcShmRefCount = (size_t*)h->clientData[1];
489  if (h->attachment
490  && 0 == __atomic_sub_fetch(hmbdcShmRefCount, 1, __ATOMIC_RELEASE)) {
491  auto& hmbdcShmDeallocator
492  = *(std::function<void (uint8_t*)>*)h->clientData[0];
493  hmbdcShmDeallocator((uint8_t*)h->attachment);
494  }
495  };
496  }
497  accSize = att->len;
498  } /// no else
499  return true;
500  } else {
501  attAllocator(typeTagIn, att);
502  // att->attachment = malloc(att->len);
503  // att->afterConsumedCleanupFunc = hasMemoryAttachment::free;
504  accSize = 0;
505  return false;
506  }
507  }
508 
509  bool accumulate(app::InBandMemorySeg const& ibms, size_t n) {
510  if (accSize < att->len) {
511  auto attBytes = ibms.seg;
512  auto copySize = std::min(n, att->len - accSize);
513  memcpy((char*)att->attachment + accSize, attBytes, copySize);
514  accSize += copySize;
515  return accSize == att->len;
516  }
517  return true;
518  }
519  app::hasMemoryAttachment * const att;
520  uint16_t typeTag = 0;
521  size_t accSize = 0;
522  std::function<void* (boost::interprocess::managed_shared_memory::handle_t)>
523  hmbdcShmHandleToAddr;
524  std::function<void (uint8_t*)> hmbdcShmDeallocator;
525  };
526 
528  : public app::client_using_tuple<PumpInIpcPortal, IpcSubscribeMessagesPossible>::type {
529  std::optional<typename NetProtocol::SendTransportEngine> sendEng_;
530  using RecvTransportEngine
531  = typename NetProtocol::template RecvTransportEngine<OneBuffer, AttachmentAllocator>;
532  std::optional<RecvTransportEngine> recvEng_;
533  IpcTransport& ipcTransport_;
534  ThreadCtx& outCtx_;
535  OneBuffer netBuffer_;
536  os::ShmBasePtrAllocator allocator_;
537  TypeTagSet* pOutboundSubscriptions_;
538  TypeTagSet inboundSubscriptions_;
539  std::string hmbdcName_;
540  uint32_t pumpMaxBlockingTimeSec_;
541 
542  public:
544  std::max((size_t)PumpInIpcPortal::MAX_MEMORY_ATTACHMENT, (size_t)(64 * 1024))> ibmaProc;
545  pid_t const hmbdcAvoidIpcFrom = getpid();
546  PumpInIpcPortal(bool& tryOwn, IpcTransport& ipcTransport
547  , ThreadCtx& outCtx, app::Config const& cfg)
548  : ipcTransport_(ipcTransport)
549  , outCtx_(outCtx)
550  , netBuffer_{outCtx
551  , NetProperty::max_message_size != 0
552  ? NetProperty::max_message_size
553  : cfg.getExt<uint32_t>("netMaxMessageSizeRuntime")
554  }
555  , allocator_((NetProtocol::instance().getTipsDomainName(cfg) + "-ipcsubs").c_str()
556  , 0
557  , sizeof(*pOutboundSubscriptions_)
558  , tryOwn)
559  , pOutboundSubscriptions_(
560  allocator_.template allocate<TypeTagSet>(SMP_CACHE_BYTES))
561  , hmbdcName_(cfg.getExt<std::string>("pumpHmbdcName"))
562  , pumpMaxBlockingTimeSec_(cfg.getHex<double>("pumpMaxBlockingTimeSec") * 1000000) {
563  pumpMaxBlockingTimeSec_ = std::min(1000000u, pumpMaxBlockingTimeSec_);
564  static_assert(IpcTransport::MAX_MESSAGE_SIZE == 0 || IpcTransport::MAX_MESSAGE_SIZE
566 
567  if constexpr (has_net_send_eng) {
568  uint32_t limit = NetProperty::max_message_size;
569  if (limit == 0) {
570  limit = cfg.getExt<uint32_t>("netMaxMessageSizeRuntime");
571  }
572  sendEng_.emplace(cfg, limit);
573  }
574  if constexpr (has_net_recv_eng) {
575  recvEng_.emplace(cfg, netBuffer_);
576  }
577  }
578 
579  template <typename CcNode>
580  void subscribeFor(CcNode const& node, uint16_t mod, uint16_t res) {
581  using Messages = typename filter_in_tuple<domain_detail::is_ipcable
582  , typename CcNode::RecvMessageTuple>::type;
583  inboundSubscriptions_.addSubsFor<Messages>(node, mod, res
584  , [this](uint16_t tag) {
585  pOutboundSubscriptions_->add(tag);
586  });
587 
588  if constexpr (has_net_recv_eng) {
589  recvEng_->template subscribeFor<Messages>(node, mod, res);
590  }
591  }
592 
593  size_t ipcSubscribingPartyCount(uint16_t tag) const {
594  return pOutboundSubscriptions_->check(tag) - inboundSubscriptions_.check(tag);
595  }
596 
597  size_t netSubscribingPartyCount(uint16_t tag) const {
598  auto res = size_t{0};
599  if constexpr (has_net_send_eng) {
600  res += sendEng_->subscribingPartyDetectedCount(tag);
601  }
602  return res;
603  }
604 
605  template <typename CcNode>
606  void advertiseFor(CcNode const& node, uint16_t mod, uint16_t res) {
607  using Messages = typename filter_in_tuple<domain_detail::is_ipcable
608  , typename CcNode::SendMessageTuple>::type;
609  if constexpr (has_net_send_eng) {
610  sendEng_->template advertiseFor<Messages>(node, mod, res);
611  }
612  }
613 
614  size_t netSendingPartyDetectedCount() const {
615  if constexpr (has_net_recv_eng) {
616  return recvEng_->sessionsRemainingActive();
617  }
618  return 0;
619  }
620 
621  size_t netRecvingPartyDetectedCount() const {
622  if constexpr (has_net_send_eng) {
623  return sendEng_->sessionsRemainingActive();
624  }
625  return 0;
626  }
627 
628 
629  template <MessageC Message>
630  void send(Message&& message) {
631  using M = typename std::decay<Message>::type;
632  using Mipc = typename domain_detail::matching_ipcable<M>::type;
633 
634  if constexpr (std::is_trivially_destructible<Mipc>::value) {
635  bool disableInterProcess = false;
636  if constexpr (has_tipsDisableSendMask<M>::value) {
637  if (M::tipsDisableSendMask() & INTER_PROCESS) {
638  disableInterProcess = true;
639  }
640  }
641  bool disableNet = false;
642  if constexpr (!has_net_send_eng) {
643  disableNet = true;
644  } else if constexpr (has_tipsDisableSendMask<M>::value) {
645  if (M::tipsDisableSendMask() & OVER_NETWORK) disableNet = true;
646  }
647 
648  std::optional<Mipc> serializedCached;
649  app::hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFuncKept = nullptr;
650  (void)afterConsumedCleanupFuncKept;
651  if (!disableInterProcess) {
652  //only send when others have interests
653  auto intDiff = pOutboundSubscriptions_->check(message.getTypeTag())
654  - inboundSubscriptions_.check(message.getTypeTag());
655  if (intDiff) {
656  //ipc message should not come back based on hmbdcAvoidIpcFrom
657  using ToSendType = ipc_from<Mipc>;
658  if constexpr (has_toHmbdcSerialized<M>::value) {
659  static_assert(std::is_same<M
660  , decltype(message.toHmbdcSerialized().toHmbdcUnserialized())>::value
661  , "mising or wrong toHmbdcUnserialized() func - cannot convertback");
662  serializedCached.emplace(message.toHmbdcSerialized());
663  if (!disableNet) {
664  //keep the att around
665  std::swap(afterConsumedCleanupFuncKept, serializedCached->afterConsumedCleanupFunc);
666  }
667  auto toSend = ToSendType{hmbdcAvoidIpcFrom, *serializedCached};
668 
669  if constexpr (app::has_hmbdcShmRefCount<ToSendType>::value) {
670  toSend.hmbdcShmRefCount = intDiff;
671  if constexpr (ToSendType::is_att_0cpyshm) {
672  auto hmbdc0cpyShmRefCount = (size_t*)toSend.app::hasMemoryAttachment::attachment;
673  __atomic_add_fetch(hmbdc0cpyShmRefCount, intDiff, __ATOMIC_RELEASE);
674  }
675  }
676  ipcTransport_.send(std::move(toSend));
677  } else if constexpr(std::is_base_of<app::hasMemoryAttachment, M>::value) {
678  auto toSend = ToSendType{hmbdcAvoidIpcFrom, message};
679  if constexpr (app::has_hmbdcShmRefCount<ToSendType>::value) {
680  toSend.hmbdcShmRefCount = intDiff;
681  if constexpr (ToSendType::is_att_0cpyshm) {
682  auto hmbdc0cpyShmRefCount = (size_t*)toSend.app::hasMemoryAttachment::attachment;
683  __atomic_add_fetch(hmbdc0cpyShmRefCount, intDiff, __ATOMIC_RELEASE);
684  }
685  }
686  ipcTransport_.send(std::move(toSend));
687  } else {
688  ipcTransport_.template sendInPlace<ToSendType>(hmbdcAvoidIpcFrom, message);
689  }
690  }
691  }
692 
693  if (disableNet) return;
694 
695  if constexpr (has_net_send_eng) {
696  if constexpr(has_toHmbdcSerialized<M>::value) {
697 
698  static_assert(NetProperty::max_message_size == 0
699  || sizeof(Mipc) <= NetProperty::max_message_size
700  , "NetProperty::max_message_size is too small");
701  if (serializedCached) {
702  // restore
703  std::swap(afterConsumedCleanupFuncKept, serializedCached->afterConsumedCleanupFunc);
704  sendEng_->queue(*serializedCached);
705  } else {
706  sendEng_->queue(message.toHmbdcSerialized());
707  }
708  } else {
709  static_assert(NetProperty::max_message_size == 0
710  || sizeof(Mipc) <= NetProperty::max_message_size
711  , "NetProperty::max_message_size is too small");
712  sendEng_->queue(message);
713  }
714  }
715  }
716  }
717 
718  void sendJustBytes(uint16_t tag, void const* bytes, size_t len
719  , app::hasMemoryAttachment* att) {
720  app::hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFuncKept
721  = att ? att->afterConsumedCleanupFunc : nullptr;
722  (void)afterConsumedCleanupFuncKept;
723  //only send when others have interests
724  if (pOutboundSubscriptions_->check(tag) > inboundSubscriptions_.check(tag)) {
725  //ipc message should not come back based on hmbdcAvoidIpcFrom
726  if (has_net_send_eng && att) {
727  //keep the att around
728  att->afterConsumedCleanupFunc = nullptr;
729  }
730  ipcTransport_.template sendJustBytesInPlace<ipc_from<app::JustBytes>>(
731  tag, bytes, len, att, hmbdcAvoidIpcFrom);
732  }
733 
734  if constexpr (has_net_send_eng) {
735  if (att) {
736  att->afterConsumedCleanupFunc = afterConsumedCleanupFuncKept;
737  }
738  sendEng_->queueJustBytes(tag, bytes, len, att);
739  }
740  }
741 
742  char const* hmbdcName() const {
743  return hmbdcName_.c_str();
744  }
745 
746  void invokedCb(size_t previousBatch) {
747  bool layback = !previousBatch;
748  if constexpr (has_net_send_eng) {
749  layback = layback && !sendEng_->bufferedMessageCount();
750  }
751  if (layback) {
752  if (pumpMaxBlockingTimeSec_) {
753  usleep(pumpMaxBlockingTimeSec_);
754  } else {
755  std::this_thread::yield();
756  }
757  }
758  if constexpr (has_net_recv_eng) {
759  recvEng_->rotate();
760  }
761  if constexpr (has_net_send_eng) {
762  sendEng_->rotate();
763  }
764  }
765 
766  template <MessageC Message>
767  void handleMessageCb(Message& m) {
768  if (hmbdc_unlikely(!inboundSubscriptions_.check(m.getTypeTag()))) {
769  return;
770  }
771  Message& msg = m;
772  using hasMemoryAttachment = app::hasMemoryAttachment;
773  if constexpr (has_toHmbdcUnserialized<Message>::value) {
774  static_assert(std::is_same<Message
775  , decltype(msg.toHmbdcUnserialized().toHmbdcSerialized())>::value
776  , "mising or wrong toHmbdcSerialized() func - cannot serialize");
777  outCtx_.send(msg.toHmbdcUnserialized());
778  } else {
779  outCtx_.send(msg);
780  }
781  if constexpr(std::is_base_of<hasMemoryAttachment, Message>::value) {
782  msg.hasMemoryAttachment::attachment = nullptr;
783  msg.hasMemoryAttachment::len = 0;
784  }
785  }
786 
787  void handleJustBytesCb(uint16_t tag, uint8_t* bytes, app::hasMemoryAttachment* att) {
788  if (hmbdc_unlikely(!inboundSubscriptions_.check(tag))) {
789  return;
790  }
791  outCtx_.sendJustBytesInPlace(tag, bytes, ipcTransport_.maxMessageSize(), att);
792  if (att) {
793  att->attachment = nullptr;
794  att->len = 0;
795  }
796  }
797 
798  void stop() {
799  if constexpr (has_net_send_eng) {
800  sendEng_->stop();
801  }
802  }
803  }; //end of Pumps
804 
805  app::Config config_;
806  using Pump = typename std::conditional<
807  run_pump_in_ipc_portal
809  , typename std::conditional<
810  run_pump_in_thread_ctx
812  , void*
813  >::type
814  >::type;
815  std::deque<Pump> pumps_;
816 
817  bool ownIpcTransport_ = false;
818 
819 public:
820  /**
821  * @brief Construct a new Domain object
822  *
823  * @param cfg The Jason configuration to specify the IPC and network transport details
824  * See the DefaultUserConfiguration.hpp files for each transport type
825  */
826  Domain(app::Config const& cfg)
827  : config_(cfg) {
828  config_.setAdditionalFallbackConfig(app::Config(DefaultUserConfig));
829  auto pumpRunMode = config_.getExt<std::string>("pumpRunMode");
830  if constexpr (run_pump_in_ipc_portal) {
831  ownIpcTransport_ = config_.getExt<bool>("createIpcTransportIfNotExist");
832  ipcTransport_.emplace(ownIpcTransport_
833  , NetProtocol::instance().getTipsDomainName(config_).c_str()
834  , config_.getExt<uint32_t>("ipcMessageQueueSizePower2Num")
835  , 0 //no pool
836  , IpcTransport::MAX_MESSAGE_SIZE != 0
837  ? IpcTransport::MAX_MESSAGE_SIZE
838  : config_.getExt<size_t>("ipcMaxMessageSizeRuntime")
839 
840  , 0xfffffffffffffffful
841  , 0
842  , config_.getExt<size_t>("ipcShmForAttPoolSize")
843  );
844 
845  ipcTransport_->setSecondsBetweenPurge(
846  config_.getExt<uint32_t>("ipcPurgeIntervalSeconds"));
847 
848  auto pumpCount = config_.getExt<uint32_t>("pumpCount");
849  if (pumpCount > 64) {
850  HMBDC_THROW(std::out_of_range, "pumpCount > 64 is not suppported");
851  }
852  auto ownIpcTransport = ownIpcTransport_;
853  for (auto i = 0u; i < config_.getExt<uint32_t>("pumpCount"); ++i) {
854  auto& pump = pumps_.emplace_back(ownIpcTransport, *ipcTransport_, threadCtx_, config_);
855  if (pumpRunMode == "auto") {
856  ipcTransport_->start(pump
857  , config_.getHex<uint64_t>("pumpCpuAffinityHex"));
858  } else if (pumpRunMode == "manual") {
859  ipcTransport_->registerToRun(pump
860  , config_.getHex<uint64_t>("pumpCpuAffinityHex"));
861  } else if (pumpRunMode == "delayed") {
862  } else {
863  HMBDC_THROW(std::out_of_range, "pumpRunMode=" << pumpRunMode << " not supported");
864  }
865  }
866  } else if constexpr (run_pump_in_thread_ctx) {
867  for (auto i = 0u; i < config_.getExt<uint32_t>("pumpCount"); ++i) {
868  auto& pump = pumps_.emplace_back(threadCtx_, config_);
869  if (pumpRunMode == "auto") {
870  threadCtx_.start(pump, 0, 0
871  , config_.getHex<uint64_t>("pumpCpuAffinityHex")
872  , config_.getHex<time::Duration>("pumpMaxBlockingTimeSec"));
873  } else if (pumpRunMode == "manual") {
874  pump.handleInCtx = threadCtx_.registerToRun(pump, 0, 0);
875  } else if (pumpRunMode == "delayed") {
876  } else {
877  HMBDC_THROW(std::out_of_range, "pumpRunMode=" << pumpRunMode << " not supported");
878  }
879  }
880  } // else no pump needed
881  }
882 
883  /**
884  * @brief manually drive the domain's pumps
885  *
886  * @tparam Args the arguments used for pump
887  * @param pumpIndex which pump - see pumpCount configure
888  * @param args the arguments used for pump
889  * - if IPC is enabled:
890  * empty
891  * - else
892  * time::Duration maxBlockingTime
893  *
894  * @return true if pump runs fine
895  * @return false either Domain stopped or exception thrown
896  */
897  template <typename... Args>
898  bool runOnce(size_t pumpIndex, Args&& ...args) {
899  auto& pump = pumps_[pumpIndex];
900  if constexpr (run_pump_in_ipc_portal) {
901  return ipcTransport_->runOnce(pump, std::forward<Args>(args)...);
902  } else if constexpr (run_pump_in_thread_ctx) {
903  return threadCtx_.runOnce(pump.handle, pump, std::forward<Args>(args)...);
904  }
905  }
906 
907  /**
908  * @brief configure the subscriptions and advertisement for a Node
909  * @details typically this is automatically done when the Node is started
910  * unless the Node wants to do this step at an earlier or later stage, this could be
911  * manually called at that time. Threadsafe call.
912  *
913  * @tparam CcNode the Node type
914  * @param node the Node
915  */
916  template <typename CcNode>
917  void addPubSubFor(CcNode const& node) {
919  , "the node expecting messages Domain not covering");
920  if constexpr ((run_pump_in_ipc_portal || run_pump_in_thread_ctx)) {
921  for (uint16_t i = 0u; i < pumps_.size(); ++i) {
922  pumps_[i].template subscribeFor(node, (uint16_t)pumps_.size(), i);
923  pumps_[i].template advertiseFor(node, (uint16_t)pumps_.size(), i);
924  }
925  }
926  }
927 
928  /**
929  * @brief configure the advertisement with message types directly
930  * This is when you do not want to involve a Node for publish, otherwise
931  * use addPubSubFor
932  * @tparam SendMessageTuple
933  */
934  template <typename SendMessageTuple>
935  void addPub() {
936  addPubSubFor(RegistrationNode<SendMessageTuple, std::tuple<>>{});
937  }
938 
939  /**
940  * @brief start a Node within this Domain as a thread - handles its subscribing here too
941  *
942  * @tparam Node a concrete Node type that send and/or recv Messages
943  * @param node the instance of the node - the Domain does not manage the object lifespan
944  * @param capacity the inbound message buffer depth - if the buffer is full the delivery mechanism
945  * is blocked until the buffer becomes available
946  * @param maxBlockingTime The node wakes up periodically even there is no messages for it
947  * so its thread can respond to Domain status change - like stopping
948  * @param cpuAffinity The CPU mask that he Node thread assigned to
949  */
950  template <typename Node>
951  void start(Node& node
952  , size_t capacity = 1024
953  , time::Duration maxBlockingTime = time::Duration::seconds(1)
954  , uint64_t cpuAffinity = 0
955  ) {
956  if (std::tuple_size<typename Node::Interests>::value
957  && capacity == 0) {
958  HMBDC_THROW(std::out_of_range, "capacity cannot be 0 when receiving messages");
959  }
960  node.updateSubscription();
961  if constexpr (Node::manual_subscribe == false) {
962  addPubSubFor(node);
963  }
964 
965  threadCtx_.start(node, capacity, node.maxMessageSize()
966  , cpuAffinity, maxBlockingTime
967  , [&node](auto && ...args) {
968  return node.ifDeliver(std::forward<decltype(args)>(args)...);
969  }
970  );
971  }
972 
973  /**
974  * @brief if pumpRunMode is set to be delayed
975  * this function start all the pumps
976  */
978  auto pumpRunMode = config_.getExt<std::string>("pumpRunMode");
979  if (pumpRunMode == "delayed") {
980  if constexpr (run_pump_in_ipc_portal) {
981  for (auto& pump : pumps_) {
982  ipcTransport_->start(pump
983  , config_.getHex<uint64_t>("pumpCpuAffinityHex"));
984  }
985  } else if constexpr (run_pump_in_thread_ctx) {
986  for (auto& pump : pumps_) {
987  threadCtx_.start(pump, 0, 0
988  , config_.getHex<uint64_t>("pumpCpuAffinityHex")
989  , config_.getHex<time::Duration>("pumpMaxBlockingTimeSec"));
990  }
991  } else {
992  HMBDC_THROW(std::runtime_error, "pumpRunMode=" << pumpRunMode);
993  }
994  }
995  }
996 
997  /**
998  * @brief start a group of Nodes as a thread pool within the Domain that collectively processing
999  * messages in a load sharing manner. Each Node is powered by a single OS thread.
1000  *
1001  * @tparam LoadSharingNodePtrIt iterator to a Node pointer
1002  *
1003  * @param begin an iterator, **begin should produce a Node& for the first Node
1004  * @param end an end iterator, [begin, end) is the range for Nodes
1005  * @param capacity the maximum messages this Node can buffer
1006  * @param cpuAffinity cpu affinity mask for this Node's thread
1007  * @param maxBlockingTime it is recommended to limit the duration a Node blocks due to
1008  * no messages to handle, so it can respond to things like Domain is stopped, or generate
1009  * heartbeats if applicable.
1010  */
1011  template <typename LoadSharingNodePtrIt>
1012  void startPool(LoadSharingNodePtrIt begin, LoadSharingNodePtrIt end
1013  , size_t capacity = 1024
1014  , time::Duration maxBlockingTime = time::Duration::seconds(1)
1015  , uint64_t cpuAffinity = 0) {
1016  using Node = typename std::decay<decltype(**LoadSharingNodePtrIt())>::type;
1017  auto maxItemSize = (*begin)->maxMessageSize();
1018 
1019  if (std::tuple_size<typename Node::Interests>::value
1020  && capacity == 0) {
1021  HMBDC_THROW(std::out_of_range, "capacity cannot be 0 when receiving messages");
1022  }
1023  for (auto it = begin; it != end; it++) {
1024  auto& node = **it;
1025  node.updateSubscription();
1026  if constexpr (Node::manual_subscribe == false) {
1027  addPubSubFor(node);
1028  }
1029  }
1030 
1031  threadCtx_.start(begin, end, capacity, maxItemSize, cpuAffinity, maxBlockingTime
1032  , [&node = **begin](auto && ...args) {
1033  return node.ifDeliver(std::forward<decltype(args)>(args)...);
1034  });
1035  }
1036 
1037 
1038  /**
1039  * @brief how many IPC parties (processes) have been detected by this context
1040  *
1041  * @return size_t
1042  */
1043  size_t ipcPartyDetectedCount() const {
1044  if constexpr (run_pump_in_ipc_portal) {
1045  return ipcTransport_->dispatchingStartedCount();
1046  }
1047  return 0;
1048  }
1049 
1050  /**
1051  * @brief how many network parties (processes) are ready to send messages to this Domain
1052  *
1053  * @return size_t
1054  */
1056  if constexpr (has_a_pump) {
1057  size_t res = 0;
1058  for (auto& pump : pumps_) {
1059  res += pump.netSendingPartyDetectedCount();
1060  }
1061  return res;
1062  } else {
1063  return 0;
1064  }
1065  }
1066 
1067  /**
1068  * @brief how many network parties (processes) are ready to receive messages from this Domain
1069  *
1070  * @return size_t
1071  */
1073  if constexpr (has_a_pump) {
1074  size_t res = 0;
1075  for (auto& pump : pumps_) {
1076  res += pump.netRecvingPartyDetectedCount();
1077  }
1078  return res;
1079  }
1080  return 0;
1081  }
1082 
1083  /**
1084  * @brief how many nodes on local machine have the message tag
1085  * marked as subscribed excluding Nodes managed by this Domain instance
1086  *
1087  * @param tag
1088  * @return size_t
1089  */
1090  size_t ipcSubscribingPartyCount(uint16_t tag) const {
1091  return pumps_[tag % pumps_.size()].ipcSubscribingPartyCount(tag);
1092  }
1093 
1094  /**
1095  * @brief how many processes connecting thru network have the message tag
1096  * marked as subscribed
1097  *
1098  * @param tag
1099  * @return size_t
1100  */
1101  size_t netSubscribingPartyCount(uint16_t tag) const {
1102  return pumps_[tag % pumps_.size()].netSubscribingPartyCount(tag);
1103  }
1104 
1105  /**
1106  * @brief publish a message through this Domain, all the Nodes in the TIPS domain
1107  * could get it if it subscribed to the Message type
1108  * @details Messages that are not trivially destructable are not deliverred outside
1109  * of this process (No IPC and network paths). They are tried to be deliverred amoung
1110  * the Nodes within local process only.
1111  * enum of DisableSendMask could also be used to disable local, IPC or network paths.
1112  *
1113  * @tparam Message The type that need to fit into the max_message_size specified by the
1114  * ipc_property and net_property. Its copy ctor is used to push it to the outgoing buffer.
1115  * With that in mind, the user can do partial copy using the copy ctor to implement just publish
1116  * the starting N bytes
1117  * @param m
1118  */
1119  template <MessageC Message>
1120  void publish(Message&& m) {
1121  bool disableInterThread = false;
1122  using M = typename std::decay<Message>::type;
1123  static_assert((int)M::typeSortIndex > (int)app::LastSystemMessage::typeTag);
1124 
1125  if constexpr (has_tipsDisableSendMask<M>::value) {
1126  disableInterThread = M::tipsDisableSendMask() & INTER_THREAD;
1127  }
1128  if (!disableInterThread) {
1129  threadCtx_.send(m);
1130  }
1131  if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1132  pumps_[m.getTypeTag() % pumps_.size()].send(std::forward<Message>(m));
1133  }
1134  }
1135 
1136  /**
1137  * @brief publish a message using byte format on this Domain - not recommended
1138  * @details Messages MUST be trivially destructable - no type based delivery features
1139  * such as DisableSendMask are supported.
1140  * It is not recommended to publish with att while the tag is also subscribed locally
1141  * due to the complexity of att->release() will be called for each local recipient and
1142  * 1 additonal time in Domain. publish via hasSharedPtrAttachment is always prefered
1143  * @param tag - type tag
1144  * @param bytes - bytes of the message contents - must match the message already
1145  * constructed binary wise (hasMemoryAttachment, inTagRange must be considered
1146  * - user needs to add those leading bytes to match binary wise)
1147  * @param len - len of the above buffer
1148  * @param att - if the message type is derived from hasMemoryAttachment, explictly
1149  * provides the ptr here, otherwise must be nullptr
1150  * it is user's responsibility to make sure att->afterConsumedCleanupFunc works
1151  * even multiple att->release() is called
1152  */
1153  void publishJustBytes(uint16_t tag, void const* bytes, size_t len
1154  , app::hasMemoryAttachment* att) {
1155  if (tag > app::LastSystemMessage::typeTag) {
1156  threadCtx_.sendJustBytesInPlace(tag, bytes, len, att);
1157  if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1158  pumps_[tag % pumps_.size()].sendJustBytes(tag, bytes, len, att);
1159  }
1160  }
1161  }
1162 
1163  /**
1164  * @brief publish a sequence of message of the same type
1165  * through this Domain, all the Nodes in the TIPS domain
1166  * could get it if it subscribed to the Message type
1167  * @details It is recommended to use this method if publishing an array
1168  * of Messages (particularly to a Node pool - see startPool() above) for performance
1169  * Messages that are not trivially destructable are not deliverred outside
1170  * of this process (No IPC and network paths). They are tried to be deliverred amoung
1171  * the Nodes within local process though
1172  *
1173  * @tparam Message
1174  * @param m
1175  */
1176  template <MessageForwardIterC ForwardIt>
1177  void publish(ForwardIt begin, size_t n) {
1178  using M = typename std::decay<decltype(*(ForwardIt()))>::type;
1179  bool disableInterThread = false;
1180  if constexpr (has_tipsDisableSendMask<M>::value) {
1181  disableInterThread = M::tipsDisableSendMask() & INTER_THREAD;
1182  }
1183  if (!disableInterThread) {
1184  threadCtx_.send(begin, n);
1185  }
1186  if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1187  while (n--) {
1188  auto& m = *begin;
1189  pumps_[m.getTypeTag() % pumps_.size()]->send(*begin++);
1190  }
1191  }
1192  }
1193 
1194  /**
1195  * @brief allocate in shm for a hasSharedPtrAttachment to be publioshed later
1196  * The release of it is auto handled in TIPS
1197  *
1198  * @tparam Message hasSharedPtrAttachment tparam
1199  * @tparam T hasSharedPtrAttachment tparam - it needs to contain "size_t hmbdc0cpyShmRefCount"
1200  * as the first data member - won't compile otherwise
1201  * @tparam Args args for T's ctor
1202  * @param att the message holds the shared memory
1203  * @param actualSize T's actual size in bytes - could be > sizeof(T) for open ended struct
1204  * @param args args for T's ctor
1205  */
1206  template <MessageC Message, typename T, typename ...Args>
1208  , size_t actualSize, Args&& ...args) {
1209  static_assert(offsetof(T, hmbdc0cpyShmRefCount) == 0);
1210  static_assert(std::is_same<decltype(T::hmbdc0cpyShmRefCount), size_t>::value);
1211  if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1212  if (pumps_.size() > 1) {
1213  HMBDC_THROW(std::runtime_error, "0cpy only works with single pump");
1214  }
1215  }
1216  att.attachmentSp = ipcTransport_->template allocateInShm<T>(
1217  actualSize, std::forward<Args>(args)...);
1218  att.attachmentSp->hmbdc0cpyShmRefCount = 1; /// sender
1219  att.len = actualSize;
1220  }
1221 
1222  /**
1223  * @brief if the Domain object owns the IPC transport
1224  *
1225  * @return true if yes
1226  * @return false if no
1227  */
1228  bool ownIpcTransport() const {
1229  return ownIpcTransport_;
1230  }
1231 
1232  /**
1233  * @brief stop the Domain and its Message delivery functions
1234  * @details a Async operation - expected to use the join to see the effects
1235  */
1236  void stop() {
1237  if constexpr (run_pump_in_ipc_portal) {
1238  ipcTransport_->stop();
1239  }
1240  threadCtx_.stop();
1241 
1242  if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1243  for (auto& pump : pumps_) {
1244  pump.stop();
1245  }
1246  }
1247  }
1248 
1249  /**
1250  * @brief wait for all the Node threads to stop
1251  *
1252  */
1253  void join() {
1254  if constexpr (run_pump_in_ipc_portal) {
1255  ipcTransport_->join();
1256  }
1257  threadCtx_.join();
1258  }
1259 };
1260 
1261 }
1262 
1263 ///
1264 /// implementation details
1265 ///
1266 namespace app {
1267 template <MessageC Message>
1268 struct MessageWrap<tips::domain_detail::ipc_from<Message>> : MessageHead {
1269  template <typename ...Args>
1270  MessageWrap(pid_t from, Args&& ... args)
1271  : MessageHead(0) //delayed
1272  , payload(std::forward<Args>(args)...) {
1273  if constexpr (std::is_base_of<hasMemoryAttachment, Message>::value) {
1274  this->scratchpad().desc.flag = hasMemoryAttachment::flag;
1275  }
1276  if constexpr (Message::hasRange) {
1277  this->typeTag = payload.getTypeTag();
1278  } else {
1279  this->typeTag = Message::typeTag;
1280  }
1281  this->scratchpad().ipc.hd.from = from;
1282  }
1283 
1285  : MessageHead(m.getTypeTag())
1286  , payload(m) {
1287  if constexpr (std::is_base_of<hasMemoryAttachment, Message>::value) {
1288  this->scratchpad().desc.flag = hasMemoryAttachment::flag;
1289  }
1290  this->scratchpad().ipc.hd.from = m.hmbdcIpcFrom;
1291  }
1292  Message payload;
1293 
1294  friend
1295  std::ostream& operator << (std::ostream& os, MessageWrap const & w) {
1296  return os << static_cast<MessageHead const&>(w) << ' ' << w.payload;
1297  }
1298 };
1299 
1300 template <MessageC Message>
1302  tips::domain_detail::ipc_from<Message>>> : MessageHead {
1306  , payload(m) {
1307  // if constexpr (std::is_base_of<hasMemoryAttachment, Message>::value) {
1308  // this->scratchpad().desc.flag = hasMemoryAttachment::flag;
1309  // } no use conflicting ipc.from and this type is wellknown not an attachment
1310  //
1311  this->scratchpad().ipc.hd.from = m.hmbdcIpcFrom;
1312  this->scratchpad().ipc.hd.inbandUnderlyingTypeTag = m.getTypeTag();
1313  }
1315 
1316  friend
1317  std::ostream& operator << (std::ostream& os, MessageWrap const & w) {
1318  return os << static_cast<MessageHead const&>(w) << ' ' << w.payload;
1319  }
1320 };
1321 
1322 template<>
1323 struct MessageWrap<tips::domain_detail::ipc_from<JustBytes>> : MessageHead {
1324  MessageWrap(uint16_t tag, void const* bytes, size_t len, hasMemoryAttachment* att
1325  , pid_t hmbdcIpcFrom)
1326  : MessageHead(tag)
1327  , payload(bytes, len) {
1328  if (att) {
1329  this->scratchpad().desc.flag = hasMemoryAttachment::flag;
1330  }
1331  this->scratchpad().ipc.hd.from = hmbdcIpcFrom;
1332  }
1333  JustBytes payload;
1334 
1335  friend
1336  std::ostream& operator << (std::ostream& os, MessageWrap const & w) {
1337  return os << static_cast<MessageHead const&>(w) << " *";
1338  }
1339 };
1340 
1341 template<>
1343  tips::domain_detail::ipc_from<JustBytes>>> : MessageHead {
1344  MessageWrap(uint16_t tag, void const* bytes, size_t len, hasMemoryAttachment* att
1345  , pid_t hmbdcIpcFrom)
1347  , payload(bytes, len) {
1348  // if constexpr (std::is_base_of<hasMemoryAttachment, Message>::value) {
1349  // this->scratchpad().desc.flag = hasMemoryAttachment::flag;
1350  // } no use conflicting ipc.from and this type is wellknown not an attachment
1351  //
1352  this->scratchpad().ipc.hd.from = hmbdcIpcFrom;
1353  this->scratchpad().ipc.hd.inbandUnderlyingTypeTag = tag;
1354  }
1356 
1357  friend
1358  std::ostream& operator << (std::ostream& os, MessageWrap const & w) {
1359  return os << static_cast<MessageHead const&>(w) << ' ' << w.payload;
1360  }
1361 };
1362 
1363 } /// app
1364 } /// hmbdc
1365 
1366 
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: Domain.hpp:527
std::string getTipsDomainName(app::Config const &cfg)
construct the host-wide unique TIPS domain name from a configure file
Definition: Domain.hpp:57
Definition: MetaUtils.hpp:252
class to hold an hmbdc configuration
Definition: Config.hpp:45
void setAdditionalFallbackConfig(Config const &c)
set additional defaults
Definition: Config.hpp:154
void startDelayedPumping()
if pumpRunMode is set to be delayed this function start all the pumps
Definition: Domain.hpp:977
boost::interprocess::managed_shared_memory ::handle_t shmHandle
Definition: Message.hpp:169
void publish(Message &&m)
publish a message through this Domain, all the Nodes in the TIPS domain could get it if it subscribed...
Definition: Domain.hpp:1120
Definition: TypedString.hpp:84
Placeholder for the Protocol within net_property that turns off network communication at compile time...
Definition: Domain.hpp:44
bool runOnce(size_t pumpIndex, Args &&...args)
manually drive the domain&#39;s pumps
Definition: Domain.hpp:898
size_t netSubscribingPartyCount(uint16_t tag) const
how many processes connecting thru network have the message tag marked as subscribed ...
Definition: Domain.hpp:1101
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:53
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
size_t netSendingPartyDetectedCount() const
how many network parties (processes) are ready to send messages to this Domain
Definition: Domain.hpp:1055
Context template parameter indicating the Context is ipc enabled and it can create or be attached to ...
Definition: Context.hpp:108
T getHex(ptree::path_type const &param) const
get a number value in hex format
Definition: Config.hpp:299
void publishJustBytes(uint16_t tag, void const *bytes, size_t len, app::hasMemoryAttachment *att)
publish a message using byte format on this Domain - not recommended
Definition: Domain.hpp:1153
void addPubSubFor(CcNode const &node)
configure the subscriptions and advertisement for a Node
Definition: Domain.hpp:917
Definition: MetaUtils.hpp:295
void startPool(LoadSharingNodePtrIt begin, LoadSharingNodePtrIt end, size_t capacity=1024, time::Duration maxBlockingTime=time::Duration::seconds(1), uint64_t cpuAffinity=0)
start a group of Nodes as a thread pool within the Domain that collectively processing messages in a ...
Definition: Domain.hpp:1012
Definition: MessageDispacher.hpp:184
template that Domain uses for the IPC communication properties
Definition: Domain.hpp:83
void join()
wait for all the Node threads to stop
Definition: Domain.hpp:1253
Definition: MetaUtils.hpp:239
Definition: Domain.hpp:308
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:203
when Domain (it&#39;s pumps) receives a hasMemoryAttachment, there is a need to allocate desired memory t...
Definition: Domain.hpp:148
Definition: Domain.hpp:238
Definition: Message.hpp:212
size_t ipcPartyDetectedCount() const
how many IPC parties (processes) have been detected by this context
Definition: Domain.hpp:1043
bool cache(app::InBandHasMemoryAttachment< Message > const &ibma, uint16_t typeTagIn)
Definition: Domain.hpp:457
void handleMessageCb(Message &msg)
Definition: Domain.hpp:252
A special type of message only used on the receiving side.
Definition: Message.hpp:341
Messages published on a TIPS pub/sub domain reach all the Nodes within that domain based on their sub...
Definition: Domain.hpp:199
Definition: Domain.hpp:134
void * operator()(uint16_t typeTag, app::hasMemoryAttachment *att)
fill in hasMemoryAttachment so it holds the desired memory and the incoming attachment can be holden ...
Definition: Domain.hpp:157
Definition: Client.hpp:288
size_t ipcSubscribingPartyCount(uint16_t tag) const
how many nodes on local machine have the message tag marked as subscribed excluding Nodes managed by ...
Definition: Domain.hpp:1090
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:57
Definition: Message.hpp:459
Domain(app::Config const &cfg)
Construct a new Domain object.
Definition: Domain.hpp:826
void putAtt(app::MessageWrap< app::hasMemoryAttachment > *item, size_t)
Definition: Domain.hpp:289
void allocateInShmFor0cpy(hasSharedPtrAttachment< Message, T > &att, size_t actualSize, Args &&...args)
allocate in shm for a hasSharedPtrAttachment to be publioshed later The release of it is auto handled...
Definition: Domain.hpp:1207
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:757
Definition: MetaUtils.hpp:67
Definition: TypeTagSet.hpp:141
Definition: Message.hpp:263
Definition: Time.hpp:134
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
Definition: MetaUtils.hpp:100
template that Domain uses for the network communication properties
Definition: Domain.hpp:32
void publish(ForwardIt begin, size_t n)
publish a sequence of message of the same type through this Domain, all the Nodes in the TIPS domain ...
Definition: Domain.hpp:1177
Definition: MetaUtils.hpp:226
bool ownIpcTransport() const
if the Domain object owns the IPC transport
Definition: Domain.hpp:1228
void stop()
stop the Domain and its Message delivery functions
Definition: Domain.hpp:1236
void start(Node &node, size_t capacity=1024, time::Duration maxBlockingTime=time::Duration::seconds(1), uint64_t cpuAffinity=0)
start a Node within this Domain as a thread - handles its subscribing here too
Definition: Domain.hpp:951
void handleJustBytesCb(uint16_t tag, void const *bytes, app::hasMemoryAttachment *att)
Definition: Domain.hpp:269
size_t netRecvingPartyDetectedCount() const
how many network parties (processes) are ready to receive messages from this Domain ...
Definition: Domain.hpp:1072
void addPub()
configure the advertisement with message types directly This is when you do not want to involve a Nod...
Definition: Domain.hpp:935
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
helping allocating object and its aggregated objects in a continouse shared memory ...
Definition: Allocators.hpp:95
Definition: Base.hpp:12
uint64_t clientData[2]
byte size of the above
Definition: Message.hpp:183
Definition: Message.hpp:430
internal use
Definition: Messages.hpp:42
Definition: Node.hpp:216
Definition: Domain.hpp:117