1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/tips/DefaultUserConfig.hpp" 4 #include "hmbdc/tips/Messages.hpp" 5 #include "hmbdc/tips/TypeTagSet.hpp" 6 #include "hmbdc/tips/Node.hpp" 7 #include "hmbdc/app/BlockingContext.hpp" 8 #include "hmbdc/app/Context.hpp" 9 #include "hmbdc/app/Config.hpp" 10 #include "hmbdc/Exception.hpp" 11 #include "hmbdc/pattern/GuardedSingleton.hpp" 12 #include "hmbdc/MetaUtils.hpp" 16 #include <type_traits> 20 namespace hmbdc {
namespace tips {
31 template <
typename Protocol,
size_t MaxMessageSize = 1000>
34 max_message_size = MaxMessageSize
36 using protocol = Protocol;
46 using SendTransportEngine =
void*;
47 template <
typename Buffer,
typename AttachmentAllocator>
48 using RecvTransportEngine =
void*;
58 return cfg.
getExt<std::string>(
"tipsDomainNonNet");
70 using NoNet = net_property<NoProtocol, 0>;
82 template <u
int16_t IpcCapacity = 64,
size_t MaxMessageSize = 1000>
85 capacity = IpcCapacity,
86 max_message_size = MaxMessageSize
96 namespace domain_detail {
98 template <MessageC Message, bool canSerialize = has_toHmbdcSerialized<Message>::value>
100 using type = std::result_of_t<decltype(&Message::toHmbdcSerialized)(Message)>;
103 template <MessageC Message>
105 using type = Message;
108 template <MessageTupleC MessageTuple>
113 using type = std::tuple<>;
116 template <MessageC Message>
119 value = (std::is_trivially_destructible<Message>::value
120 || has_toHmbdcSerialized<Message>::value) ? 1:0,
124 template <MessageC Message, MessageC ...Messages>
127 using type =
typename std::conditional<is_ipcable<Message>::value
133 template <MessageC Message>
135 ipc_from(pid_t from, Message
const& m)
137 , hmbdcIpcFrom(from) {}
158 att->attachment = ::malloc(att->len);
160 ::free(hasAtt->attachment);
161 hasAtt->attachment =
nullptr;
163 return att->attachment;
194 template <MessageTupleC RecvMessageTupleIn
195 ,
typename IpcProp = NoIpc
196 ,
typename NetProp = NoNet
198 ,
typename AttachmentAllocator = DefaultAttachmentAllocator >
200 using IpcProperty = IpcProp;
201 using NetProperty = NetProp;
202 using NetProtocol =
typename NetProperty::protocol;
204 using RecvMessageTuple =
typename hmbdc::remove_duplicate<RecvMessageTupleIn>::type;
205 using ThreadCtx = NodeContext<RecvMessageTuple>;
207 ThreadCtx threadCtx_;
210 using NetableRecvMessages = IpcableRecvMessages;
213 run_pump_in_ipc_portal = IpcProperty::capacity != 0,
214 run_pump_in_thread_ctx = run_pump_in_ipc_portal
215 ? 0 : !std::is_same<NoNet, NetProperty>::value,
216 has_a_pump = run_pump_in_ipc_portal || run_pump_in_thread_ctx,
217 has_net_send_eng = !std::is_same<NoNet, NetProperty>::value,
218 has_net_recv_eng = !std::is_same<NoNet, NetProperty>::value
219 && std::tuple_size<NetableRecvMessages>::value,
224 using IpcSubscribeMessagesPossible = IpcableRecvMessages;
225 using IpcTransport =
typename std::conditional<
226 IpcProperty::capacity != 0
228 (IpcProperty::max_message_size != 0
229 ? IpcProperty::max_message_size
237 std::optional<IpcTransport> ipcTransport_;
239 ThreadCtx& threadCtx;
240 size_t dispCount = 0;
243 OneBuffer(ThreadCtx& threadCtx,
size_t maxItemSizeIn)
244 : threadCtx(threadCtx)
245 , maxItemSize_(maxItemSizeIn) {
247 size_t maxItemSize()
const {
251 template <MessageC Message>
254 if constexpr (has_toHmbdcUnserialized<Message>::value) {
255 static_assert(std::is_same<Message
256 , decltype(msg.toHmbdcUnserialized().toHmbdcSerialized())>::value
257 ,
"mising or wrong toHmbdcSerialized() func - cannot serialize");
258 threadCtx.send(msg.toHmbdcUnserialized());
262 if constexpr(std::is_base_of<hasMemoryAttachment, Message>::value) {
264 msg.hasMemoryAttachment::attachment =
nullptr;
265 msg.hasMemoryAttachment::len = 0;
270 threadCtx.sendJustBytesInPlace(tag, bytes, maxItemSize_, att);
273 att->attachment =
nullptr;
278 void put(
void* item,
size_t) {
280 , NetableRecvMessages>::type;
283 h.scratchpad().desc.flag = 0;
284 if (disp(*
this, h)) {
291 , NetableRecvMessages>::type;
293 item->scratchpad().desc.flag = app::hasMemoryAttachment::flag;
294 if (disp(*
this, *item)) {
298 item->payload.release();
302 template <
typename T>
void put(T& item) {put(&item,
sizeof(T));}
303 template <
typename T>
void putSome(T& item) {
304 put(&item, std::min(
sizeof(item), maxItemSize()));
310 std::optional<typename NetProtocol::SendTransportEngine> sendEng_;
311 using RecvTransportEngine
312 =
typename NetProtocol::template RecvTransportEngine<OneBuffer, AttachmentAllocator>;
313 std::optional<RecvTransportEngine> recvEng_;
316 std::string hmbdcName_;
319 typename ThreadCtx::ClientRegisterHandle handleInCtx;
324 , NetProperty::max_message_size != 0
325 ? NetProperty::max_message_size
326 : cfg.
getExt<uint32_t>(
"netMaxMessageSizeRuntime")
328 , hmbdcName_(cfg.
getExt<std::string>(
"pumpHmbdcName")) {
329 if constexpr (has_net_send_eng) {
330 uint32_t limit = NetProperty::max_message_size;
332 limit = cfg.
getExt<uint32_t>(
"netMaxMessageSizeRuntime");
334 sendEng_.emplace(cfg, limit);
336 if constexpr (has_net_recv_eng) {
337 recvEng_.emplace(cfg, netBuffer_);
341 template <
typename CcNode>
342 void subscribeFor(CcNode
const& node, uint16_t mod, uint16_t res) {
343 if constexpr (has_net_recv_eng) {
345 ,
typename CcNode::RecvMessageTuple>::type;
346 recvEng_->template subscribeFor<Messages>(node, mod, res);
350 size_t ipcSubscribingPartyCount(uint16_t tag)
const {
354 size_t netSubscribingPartyCount(uint16_t tag)
const {
355 auto res =
size_t{0};
356 if constexpr (has_net_send_eng) {
357 res += sendEng_->subscribingPartyDetectedCount(tag);
362 template <
typename CcNode>
363 void advertiseFor(CcNode
const& node, uint16_t mod, uint16_t res) {
364 if constexpr (has_net_send_eng) {
366 ,
typename CcNode::SendMessageTuple>::type;
367 sendEng_->template advertiseFor<Messages>(node, mod, res);
371 size_t netSendingPartyDetectedCount()
const {
372 if constexpr (has_net_recv_eng) {
373 return recvEng_->sessionsRemainingActive();
378 size_t netRecvingPartyDetectedCount()
const {
379 if constexpr (has_net_send_eng) {
380 return sendEng_->sessionsRemainingActive();
385 template <MessageC Message>
386 void send(Message&& message) {
387 using M =
typename std::decay<Message>::type;
388 using Mnet =
typename domain_detail::matching_ipcable<M>::type;
390 if constexpr (std::is_trivially_destructible<Mnet>::value) {
391 if constexpr (has_tipsDisableSendMask<M>::value) {
392 if (M::tipsDisableSendMask() & OVER_NETWORK)
return;
394 if constexpr (has_net_send_eng) {
395 if constexpr(has_toHmbdcSerialized<M>::value) {
396 static_assert(std::is_same<M
397 , decltype(message.toHmbdcSerialized().toHmbdcUnserialized())>::value
398 ,
"mising or wrong toHmbdcUnserialized() func - cannot convertback");
399 static_assert(NetProperty::max_message_size == 0
400 ||
sizeof(decltype(message.toHmbdcSerialized())) <= NetProperty::max_message_size
401 ,
"NetProperty::max_message_size is too small");
402 sendEng_->queue(message.toHmbdcSerialized());
404 static_assert(NetProperty::max_message_size == 0
405 ||
sizeof(message) <= NetProperty::max_message_size
406 ,
"NetProperty::max_message_size is too small");
407 sendEng_->queue(message);
413 void sendJustBytes(uint16_t tag,
void const* bytes,
size_t len
415 if constexpr (has_net_send_eng) {
416 sendEng_->queueJustBytes(tag, bytes, len, att);
420 char const* hmbdcName()
const {
421 return hmbdcName_.c_str();
424 void invokedCb(
size_t previousBatch) {
425 bool layback = !previousBatch;
426 if constexpr (has_net_send_eng) {
427 layback = layback && !sendEng_->bufferedMessageCount();
430 std::this_thread::yield();
433 if constexpr (has_net_recv_eng) {
434 netBuffer_.dispCount = 0;
437 if constexpr (has_net_send_eng) {
443 if constexpr (has_net_send_eng) {
449 template <
size_t MAX_MEMORY_ATTACHMENT>
453 uint8_t underlyingMessage[MAX_MEMORY_ATTACHMENT];
454 AttachmentAllocator attAllocator;
456 template <MessageC Message>
458 static_assert(
sizeof(Message) <= MAX_MEMORY_ATTACHMENT,
"");
459 if (hmbdc_unlikely(att->attachment)) {
460 HMBDC_THROW(std::logic_error,
"previous InBandMemoryAttachment not concluded");
463 if constexpr(Message::justBytes) {
464 memcpy(underlyingMessage, &ibma.underlyingMessage,
sizeof(underlyingMessage));
466 memcpy(underlyingMessage, &ibma.underlyingMessage,
sizeof(Message));
469 if (att->holdShmHandle<Message>()) {
470 if constexpr (app::has_hmbdcShmRefCount<Message>::value) {
471 auto shmAddr = hmbdcShmHandleToAddr(att->
shmHandle);
472 att->attachment = shmAddr;
473 att->
clientData[0] = (uint64_t)&hmbdcShmDeallocator;
474 static_assert(
sizeof(ibma.underlyingMessage.hmbdcShmRefCount) ==
sizeof(
size_t));
475 att->
clientData[1] = (uint64_t)&ibma.underlyingMessage.hmbdcShmRefCount;
476 if constexpr (Message::is_att_0cpyshm) {
478 auto hmbdcShmRefCount = (
size_t*)h->attachment;
480 && 0 == __atomic_sub_fetch(hmbdcShmRefCount, 1, __ATOMIC_RELEASE)) {
481 auto& hmbdcShmDeallocator
482 = *(std::function<void (uint8_t*)>*)h->clientData[0];
483 hmbdcShmDeallocator((uint8_t*)h->attachment);
486 }
else if constexpr (app::has_hmbdcShmRefCount<Message>::value) {
488 auto hmbdcShmRefCount = (
size_t*)h->clientData[1];
490 && 0 == __atomic_sub_fetch(hmbdcShmRefCount, 1, __ATOMIC_RELEASE)) {
491 auto& hmbdcShmDeallocator
492 = *(std::function<void (uint8_t*)>*)h->clientData[0];
493 hmbdcShmDeallocator((uint8_t*)h->attachment);
501 attAllocator(typeTagIn, att);
510 if (accSize < att->len) {
511 auto attBytes = ibms.seg;
512 auto copySize = std::min(n, att->len - accSize);
513 memcpy((
char*)att->attachment + accSize, attBytes, copySize);
515 return accSize == att->len;
520 uint16_t typeTag = 0;
522 std::function<void* (boost::interprocess::managed_shared_memory::handle_t)>
523 hmbdcShmHandleToAddr;
524 std::function<void (uint8_t*)> hmbdcShmDeallocator;
529 std::optional<typename NetProtocol::SendTransportEngine> sendEng_;
530 using RecvTransportEngine
531 =
typename NetProtocol::template RecvTransportEngine<OneBuffer, AttachmentAllocator>;
532 std::optional<RecvTransportEngine> recvEng_;
533 IpcTransport& ipcTransport_;
539 std::string hmbdcName_;
540 uint32_t pumpMaxBlockingTimeSec_;
544 std::max((
size_t)PumpInIpcPortal::MAX_MEMORY_ATTACHMENT, (
size_t)(64 * 1024))> ibmaProc;
545 pid_t
const hmbdcAvoidIpcFrom = getpid();
548 : ipcTransport_(ipcTransport)
551 , NetProperty::max_message_size != 0
552 ? NetProperty::max_message_size
553 : cfg.
getExt<uint32_t>(
"netMaxMessageSizeRuntime")
555 , allocator_((NetProtocol::instance().getTipsDomainName(cfg) +
"-ipcsubs").c_str()
557 ,
sizeof(*pOutboundSubscriptions_)
559 , pOutboundSubscriptions_(
560 allocator_.template allocate<TypeTagSet>(SMP_CACHE_BYTES))
561 , hmbdcName_(cfg.
getExt<std::string>(
"pumpHmbdcName"))
562 , pumpMaxBlockingTimeSec_(cfg.
getHex<
double>(
"pumpMaxBlockingTimeSec") * 1000000) {
563 pumpMaxBlockingTimeSec_ = std::min(1000000u, pumpMaxBlockingTimeSec_);
564 static_assert(IpcTransport::MAX_MESSAGE_SIZE == 0 || IpcTransport::MAX_MESSAGE_SIZE
567 if constexpr (has_net_send_eng) {
568 uint32_t limit = NetProperty::max_message_size;
570 limit = cfg.
getExt<uint32_t>(
"netMaxMessageSizeRuntime");
572 sendEng_.emplace(cfg, limit);
574 if constexpr (has_net_recv_eng) {
575 recvEng_.emplace(cfg, netBuffer_);
579 template <
typename CcNode>
580 void subscribeFor(CcNode
const& node, uint16_t mod, uint16_t res) {
582 ,
typename CcNode::RecvMessageTuple>::type;
583 inboundSubscriptions_.addSubsFor<Messages>(node, mod, res
584 , [
this](uint16_t tag) {
585 pOutboundSubscriptions_->add(tag);
588 if constexpr (has_net_recv_eng) {
589 recvEng_->template subscribeFor<Messages>(node, mod, res);
593 size_t ipcSubscribingPartyCount(uint16_t tag)
const {
594 return pOutboundSubscriptions_->check(tag) - inboundSubscriptions_.check(tag);
597 size_t netSubscribingPartyCount(uint16_t tag)
const {
598 auto res =
size_t{0};
599 if constexpr (has_net_send_eng) {
600 res += sendEng_->subscribingPartyDetectedCount(tag);
605 template <
typename CcNode>
606 void advertiseFor(CcNode
const& node, uint16_t mod, uint16_t res) {
608 ,
typename CcNode::SendMessageTuple>::type;
609 if constexpr (has_net_send_eng) {
610 sendEng_->template advertiseFor<Messages>(node, mod, res);
614 size_t netSendingPartyDetectedCount()
const {
615 if constexpr (has_net_recv_eng) {
616 return recvEng_->sessionsRemainingActive();
621 size_t netRecvingPartyDetectedCount()
const {
622 if constexpr (has_net_send_eng) {
623 return sendEng_->sessionsRemainingActive();
629 template <MessageC Message>
630 void send(Message&& message) {
631 using M =
typename std::decay<Message>::type;
632 using Mipc =
typename domain_detail::matching_ipcable<M>::type;
634 if constexpr (std::is_trivially_destructible<Mipc>::value) {
635 bool disableInterProcess =
false;
636 if constexpr (has_tipsDisableSendMask<M>::value) {
637 if (M::tipsDisableSendMask() & INTER_PROCESS) {
638 disableInterProcess =
true;
641 bool disableNet =
false;
642 if constexpr (!has_net_send_eng) {
644 }
else if constexpr (has_tipsDisableSendMask<M>::value) {
645 if (M::tipsDisableSendMask() & OVER_NETWORK) disableNet =
true;
648 std::optional<Mipc> serializedCached;
649 app::hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFuncKept =
nullptr;
650 (void)afterConsumedCleanupFuncKept;
651 if (!disableInterProcess) {
653 auto intDiff = pOutboundSubscriptions_->check(message.getTypeTag())
654 - inboundSubscriptions_.check(message.getTypeTag());
658 if constexpr (has_toHmbdcSerialized<M>::value) {
659 static_assert(std::is_same<M
660 , decltype(message.toHmbdcSerialized().toHmbdcUnserialized())>::value
661 ,
"mising or wrong toHmbdcUnserialized() func - cannot convertback");
662 serializedCached.emplace(message.toHmbdcSerialized());
665 std::swap(afterConsumedCleanupFuncKept, serializedCached->afterConsumedCleanupFunc);
667 auto toSend = ToSendType{hmbdcAvoidIpcFrom, *serializedCached};
669 if constexpr (app::has_hmbdcShmRefCount<ToSendType>::value) {
670 toSend.hmbdcShmRefCount = intDiff;
671 if constexpr (ToSendType::is_att_0cpyshm) {
672 auto hmbdc0cpyShmRefCount = (
size_t*)toSend.app::hasMemoryAttachment::attachment;
673 __atomic_add_fetch(hmbdc0cpyShmRefCount, intDiff, __ATOMIC_RELEASE);
676 ipcTransport_.send(std::move(toSend));
677 }
else if constexpr(std::is_base_of<app::hasMemoryAttachment, M>::value) {
678 auto toSend = ToSendType{hmbdcAvoidIpcFrom, message};
679 if constexpr (app::has_hmbdcShmRefCount<ToSendType>::value) {
680 toSend.hmbdcShmRefCount = intDiff;
681 if constexpr (ToSendType::is_att_0cpyshm) {
682 auto hmbdc0cpyShmRefCount = (
size_t*)toSend.app::hasMemoryAttachment::attachment;
683 __atomic_add_fetch(hmbdc0cpyShmRefCount, intDiff, __ATOMIC_RELEASE);
686 ipcTransport_.send(std::move(toSend));
688 ipcTransport_.template sendInPlace<ToSendType>(hmbdcAvoidIpcFrom, message);
693 if (disableNet)
return;
695 if constexpr (has_net_send_eng) {
696 if constexpr(has_toHmbdcSerialized<M>::value) {
698 static_assert(NetProperty::max_message_size == 0
699 ||
sizeof(Mipc) <= NetProperty::max_message_size
700 ,
"NetProperty::max_message_size is too small");
701 if (serializedCached) {
703 std::swap(afterConsumedCleanupFuncKept, serializedCached->afterConsumedCleanupFunc);
704 sendEng_->queue(*serializedCached);
706 sendEng_->queue(message.toHmbdcSerialized());
709 static_assert(NetProperty::max_message_size == 0
710 ||
sizeof(Mipc) <= NetProperty::max_message_size
711 ,
"NetProperty::max_message_size is too small");
712 sendEng_->queue(message);
718 void sendJustBytes(uint16_t tag,
void const* bytes,
size_t len
720 app::hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFuncKept
721 = att ? att->afterConsumedCleanupFunc :
nullptr;
722 (void)afterConsumedCleanupFuncKept;
724 if (pOutboundSubscriptions_->check(tag) > inboundSubscriptions_.check(tag)) {
726 if (has_net_send_eng && att) {
728 att->afterConsumedCleanupFunc =
nullptr;
730 ipcTransport_.template sendJustBytesInPlace<ipc_from<app::JustBytes>>(
731 tag, bytes, len, att, hmbdcAvoidIpcFrom);
734 if constexpr (has_net_send_eng) {
736 att->afterConsumedCleanupFunc = afterConsumedCleanupFuncKept;
738 sendEng_->queueJustBytes(tag, bytes, len, att);
742 char const* hmbdcName()
const {
743 return hmbdcName_.c_str();
746 void invokedCb(
size_t previousBatch) {
747 bool layback = !previousBatch;
748 if constexpr (has_net_send_eng) {
749 layback = layback && !sendEng_->bufferedMessageCount();
752 if (pumpMaxBlockingTimeSec_) {
753 usleep(pumpMaxBlockingTimeSec_);
755 std::this_thread::yield();
758 if constexpr (has_net_recv_eng) {
761 if constexpr (has_net_send_eng) {
766 template <MessageC Message>
767 void handleMessageCb(Message& m) {
768 if (hmbdc_unlikely(!inboundSubscriptions_.check(m.getTypeTag()))) {
773 if constexpr (has_toHmbdcUnserialized<Message>::value) {
774 static_assert(std::is_same<Message
775 , decltype(msg.toHmbdcUnserialized().toHmbdcSerialized())>::value
776 ,
"mising or wrong toHmbdcSerialized() func - cannot serialize");
777 outCtx_.send(msg.toHmbdcUnserialized());
781 if constexpr(std::is_base_of<hasMemoryAttachment, Message>::value) {
782 msg.hasMemoryAttachment::attachment =
nullptr;
783 msg.hasMemoryAttachment::len = 0;
788 if (hmbdc_unlikely(!inboundSubscriptions_.check(tag))) {
791 outCtx_.sendJustBytesInPlace(tag, bytes, ipcTransport_.maxMessageSize(), att);
793 att->attachment =
nullptr;
799 if constexpr (has_net_send_eng) {
806 using Pump =
typename std::conditional<
807 run_pump_in_ipc_portal
809 ,
typename std::conditional<
810 run_pump_in_thread_ctx
815 std::deque<Pump> pumps_;
817 bool ownIpcTransport_ =
false;
829 auto pumpRunMode = config_.
getExt<std::string>(
"pumpRunMode");
830 if constexpr (run_pump_in_ipc_portal) {
831 ownIpcTransport_ = config_.
getExt<
bool>(
"createIpcTransportIfNotExist");
832 ipcTransport_.emplace(ownIpcTransport_
833 , NetProtocol::instance().getTipsDomainName(config_).c_str()
834 , config_.
getExt<uint32_t>(
"ipcMessageQueueSizePower2Num")
836 , IpcTransport::MAX_MESSAGE_SIZE != 0
837 ? IpcTransport::MAX_MESSAGE_SIZE
838 : config_.
getExt<
size_t>(
"ipcMaxMessageSizeRuntime")
840 , 0xfffffffffffffffful
842 , config_.
getExt<
size_t>(
"ipcShmForAttPoolSize")
845 ipcTransport_->setSecondsBetweenPurge(
846 config_.
getExt<uint32_t>(
"ipcPurgeIntervalSeconds"));
848 auto pumpCount = config_.
getExt<uint32_t>(
"pumpCount");
849 if (pumpCount > 64) {
850 HMBDC_THROW(std::out_of_range,
"pumpCount > 64 is not suppported");
853 for (
auto i = 0u; i < config_.
getExt<uint32_t>(
"pumpCount"); ++i) {
854 auto& pump = pumps_.emplace_back(
ownIpcTransport, *ipcTransport_, threadCtx_, config_);
855 if (pumpRunMode ==
"auto") {
856 ipcTransport_->start(pump
857 , config_.
getHex<uint64_t>(
"pumpCpuAffinityHex"));
858 }
else if (pumpRunMode ==
"manual") {
859 ipcTransport_->registerToRun(pump
860 , config_.
getHex<uint64_t>(
"pumpCpuAffinityHex"));
861 }
else if (pumpRunMode ==
"delayed") {
863 HMBDC_THROW(std::out_of_range,
"pumpRunMode=" << pumpRunMode <<
" not supported");
866 }
else if constexpr (run_pump_in_thread_ctx) {
867 for (
auto i = 0u; i < config_.
getExt<uint32_t>(
"pumpCount"); ++i) {
868 auto& pump = pumps_.emplace_back(threadCtx_, config_);
869 if (pumpRunMode ==
"auto") {
870 threadCtx_.start(pump, 0, 0
871 , config_.
getHex<uint64_t>(
"pumpCpuAffinityHex")
873 }
else if (pumpRunMode ==
"manual") {
874 pump.handleInCtx = threadCtx_.registerToRun(pump, 0, 0);
875 }
else if (pumpRunMode ==
"delayed") {
877 HMBDC_THROW(std::out_of_range,
"pumpRunMode=" << pumpRunMode <<
" not supported");
897 template <
typename... Args>
898 bool runOnce(
size_t pumpIndex, Args&& ...args) {
899 auto& pump = pumps_[pumpIndex];
900 if constexpr (run_pump_in_ipc_portal) {
901 return ipcTransport_->runOnce(pump, std::forward<Args>(args)...);
902 }
else if constexpr (run_pump_in_thread_ctx) {
903 return threadCtx_.runOnce(pump.handle, pump, std::forward<Args>(args)...);
916 template <
typename CcNode>
919 ,
"the node expecting messages Domain not covering");
920 if constexpr ((run_pump_in_ipc_portal || run_pump_in_thread_ctx)) {
921 for (uint16_t i = 0u; i < pumps_.size(); ++i) {
922 pumps_[i].template subscribeFor(node, (uint16_t)pumps_.size(), i);
923 pumps_[i].template advertiseFor(node, (uint16_t)pumps_.size(), i);
934 template <
typename SendMessageTuple>
950 template <
typename Node>
952 ,
size_t capacity = 1024
954 , uint64_t cpuAffinity = 0
956 if (std::tuple_size<typename Node::Interests>::value
958 HMBDC_THROW(std::out_of_range,
"capacity cannot be 0 when receiving messages");
960 node.updateSubscription();
961 if constexpr (Node::manual_subscribe ==
false) {
965 threadCtx_.start(node, capacity, node.maxMessageSize()
966 , cpuAffinity, maxBlockingTime
967 , [&node](
auto && ...args) {
968 return node.ifDeliver(std::forward<decltype(args)>(args)...);
978 auto pumpRunMode = config_.
getExt<std::string>(
"pumpRunMode");
979 if (pumpRunMode ==
"delayed") {
980 if constexpr (run_pump_in_ipc_portal) {
981 for (
auto& pump : pumps_) {
982 ipcTransport_->start(pump
983 , config_.
getHex<uint64_t>(
"pumpCpuAffinityHex"));
985 }
else if constexpr (run_pump_in_thread_ctx) {
986 for (
auto& pump : pumps_) {
987 threadCtx_.start(pump, 0, 0
988 , config_.
getHex<uint64_t>(
"pumpCpuAffinityHex")
992 HMBDC_THROW(std::runtime_error,
"pumpRunMode=" << pumpRunMode);
1011 template <
typename LoadSharingNodePtrIt>
1012 void startPool(LoadSharingNodePtrIt begin, LoadSharingNodePtrIt end
1013 ,
size_t capacity = 1024
1015 , uint64_t cpuAffinity = 0) {
1016 using Node =
typename std::decay<decltype(**LoadSharingNodePtrIt())>::type;
1017 auto maxItemSize = (*begin)->maxMessageSize();
1019 if (std::tuple_size<typename Node::Interests>::value
1021 HMBDC_THROW(std::out_of_range,
"capacity cannot be 0 when receiving messages");
1023 for (
auto it = begin; it != end; it++) {
1025 node.updateSubscription();
1026 if constexpr (Node::manual_subscribe ==
false) {
1031 threadCtx_.start(begin, end, capacity, maxItemSize, cpuAffinity, maxBlockingTime
1032 , [&node = **begin](
auto && ...args) {
1033 return node.ifDeliver(std::forward<decltype(args)>(args)...);
1044 if constexpr (run_pump_in_ipc_portal) {
1045 return ipcTransport_->dispatchingStartedCount();
1056 if constexpr (has_a_pump) {
1058 for (
auto& pump : pumps_) {
1059 res += pump.netSendingPartyDetectedCount();
1073 if constexpr (has_a_pump) {
1075 for (
auto& pump : pumps_) {
1076 res += pump.netRecvingPartyDetectedCount();
1091 return pumps_[tag % pumps_.size()].ipcSubscribingPartyCount(tag);
1102 return pumps_[tag % pumps_.size()].netSubscribingPartyCount(tag);
1119 template <MessageC Message>
1121 bool disableInterThread =
false;
1122 using M =
typename std::decay<Message>::type;
1123 static_assert((
int)M::typeSortIndex > (
int)app::LastSystemMessage::typeTag);
1125 if constexpr (has_tipsDisableSendMask<M>::value) {
1126 disableInterThread = M::tipsDisableSendMask() & INTER_THREAD;
1128 if (!disableInterThread) {
1131 if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1132 pumps_[m.getTypeTag() % pumps_.size()].send(std::forward<Message>(m));
1155 if (tag > app::LastSystemMessage::typeTag) {
1156 threadCtx_.sendJustBytesInPlace(tag, bytes, len, att);
1157 if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1158 pumps_[tag % pumps_.size()].sendJustBytes(tag, bytes, len, att);
1176 template <MessageForwardIterC ForwardIt>
1178 using M =
typename std::decay<decltype(*(ForwardIt()))>::type;
1179 bool disableInterThread =
false;
1180 if constexpr (has_tipsDisableSendMask<M>::value) {
1181 disableInterThread = M::tipsDisableSendMask() & INTER_THREAD;
1183 if (!disableInterThread) {
1184 threadCtx_.send(begin, n);
1186 if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1189 pumps_[m.getTypeTag() % pumps_.size()]->send(*begin++);
1206 template <MessageC Message,
typename T,
typename ...Args>
1208 ,
size_t actualSize, Args&& ...args) {
1209 static_assert(offsetof(T, hmbdc0cpyShmRefCount) == 0);
1210 static_assert(std::is_same<decltype(T::hmbdc0cpyShmRefCount),
size_t>::value);
1211 if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1212 if (pumps_.size() > 1) {
1213 HMBDC_THROW(std::runtime_error,
"0cpy only works with single pump");
1216 att.attachmentSp = ipcTransport_->template allocateInShm<T>(
1217 actualSize, std::forward<Args>(args)...);
1218 att.attachmentSp->hmbdc0cpyShmRefCount = 1;
1219 att.len = actualSize;
1229 return ownIpcTransport_;
1237 if constexpr (run_pump_in_ipc_portal) {
1238 ipcTransport_->stop();
1242 if constexpr (run_pump_in_ipc_portal || run_pump_in_thread_ctx) {
1243 for (
auto& pump : pumps_) {
1254 if constexpr (run_pump_in_ipc_portal) {
1255 ipcTransport_->join();
1267 template <MessageC Message>
1269 template <
typename ...Args>
1272 , payload(std::forward<Args>(args)...) {
1273 if constexpr (std::is_base_of<hasMemoryAttachment, Message>::value) {
1274 this->scratchpad().desc.flag = hasMemoryAttachment::flag;
1276 if constexpr (Message::hasRange) {
1277 this->typeTag = payload.getTypeTag();
1279 this->typeTag = Message::typeTag;
1281 this->scratchpad().ipc.hd.from = from;
1287 if constexpr (std::is_base_of<hasMemoryAttachment, Message>::value) {
1288 this->scratchpad().desc.flag = hasMemoryAttachment::flag;
1290 this->scratchpad().ipc.hd.from = m.hmbdcIpcFrom;
1295 std::ostream& operator << (std::ostream& os,
MessageWrap const & w) {
1296 return os << static_cast<MessageHead const&>(w) <<
' ' << w.payload;
1300 template <MessageC Message>
1302 tips::domain_detail::ipc_from<Message>>> :
MessageHead {
1311 this->scratchpad().ipc.hd.from = m.hmbdcIpcFrom;
1312 this->scratchpad().ipc.hd.inbandUnderlyingTypeTag = m.getTypeTag();
1317 std::ostream& operator << (std::ostream& os,
MessageWrap const & w) {
1318 return os << static_cast<MessageHead const&>(w) <<
' ' << w.payload;
1325 , pid_t hmbdcIpcFrom)
1327 , payload(bytes, len) {
1329 this->scratchpad().desc.flag = hasMemoryAttachment::flag;
1331 this->scratchpad().ipc.hd.from = hmbdcIpcFrom;
1336 std::ostream& operator << (std::ostream& os,
MessageWrap const & w) {
1337 return os << static_cast<MessageHead const&>(w) <<
" *";
1343 tips::domain_detail::ipc_from<JustBytes>>> :
MessageHead {
1345 , pid_t hmbdcIpcFrom)
1347 , payload(bytes, len) {
1352 this->scratchpad().ipc.hd.from = hmbdcIpcFrom;
1353 this->scratchpad().ipc.hd.inbandUnderlyingTypeTag = tag;
1358 std::ostream& operator << (std::ostream& os,
MessageWrap const & w) {
1359 return os << static_cast<MessageHead const&>(w) <<
' ' << w.payload;
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: Domain.hpp:527
std::string getTipsDomainName(app::Config const &cfg)
construct the host-wide unique TIPS domain name from a configure file
Definition: Domain.hpp:57
Definition: MetaUtils.hpp:252
Definition: Domain.hpp:450
class to hold an hmbdc configuration
Definition: Config.hpp:45
void setAdditionalFallbackConfig(Config const &c)
set additional defaults
Definition: Config.hpp:154
void startDelayedPumping()
if pumpRunMode is set to be delayed this function start all the pumps
Definition: Domain.hpp:977
boost::interprocess::managed_shared_memory ::handle_t shmHandle
Definition: Message.hpp:169
void publish(Message &&m)
publish a message through this Domain, all the Nodes in the TIPS domain could get it if it subscribed...
Definition: Domain.hpp:1120
Definition: TypedString.hpp:84
Placeholder for the Protocol within net_property that turns off network communication at compile time...
Definition: Domain.hpp:44
bool runOnce(size_t pumpIndex, Args &&...args)
manually drive the domain's pumps
Definition: Domain.hpp:898
size_t netSubscribingPartyCount(uint16_t tag) const
how many processes connecting thru network have the message tag marked as subscribed ...
Definition: Domain.hpp:1101
Definition: Domain.hpp:109
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:53
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
size_t netSendingPartyDetectedCount() const
how many network parties (processes) are ready to send messages to this Domain
Definition: Domain.hpp:1055
Definition: Domain.hpp:99
Context template parameter indicating the Context is ipc enabled and it can create or be attached to ...
Definition: Context.hpp:108
T getHex(ptree::path_type const ¶m) const
get a number value in hex format
Definition: Config.hpp:299
void publishJustBytes(uint16_t tag, void const *bytes, size_t len, app::hasMemoryAttachment *att)
publish a message using byte format on this Domain - not recommended
Definition: Domain.hpp:1153
void addPubSubFor(CcNode const &node)
configure the subscriptions and advertisement for a Node
Definition: Domain.hpp:917
Definition: MetaUtils.hpp:295
void startPool(LoadSharingNodePtrIt begin, LoadSharingNodePtrIt end, size_t capacity=1024, time::Duration maxBlockingTime=time::Duration::seconds(1), uint64_t cpuAffinity=0)
start a group of Nodes as a thread pool within the Domain that collectively processing messages in a ...
Definition: Domain.hpp:1012
Definition: MessageDispacher.hpp:184
template that Domain uses for the IPC communication properties
Definition: Domain.hpp:83
void join()
wait for all the Node threads to stop
Definition: Domain.hpp:1253
Definition: MetaUtils.hpp:239
Definition: Domain.hpp:308
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:203
when Domain (it's pumps) receives a hasMemoryAttachment, there is a need to allocate desired memory t...
Definition: Domain.hpp:148
Definition: Domain.hpp:238
Definition: Message.hpp:212
size_t ipcPartyDetectedCount() const
how many IPC parties (processes) have been detected by this context
Definition: Domain.hpp:1043
bool cache(app::InBandHasMemoryAttachment< Message > const &ibma, uint16_t typeTagIn)
Definition: Domain.hpp:457
void handleMessageCb(Message &msg)
Definition: Domain.hpp:252
A special type of message only used on the receiving side.
Definition: Message.hpp:341
Messages published on a TIPS pub/sub domain reach all the Nodes within that domain based on their sub...
Definition: Domain.hpp:199
Definition: Domain.hpp:134
void * operator()(uint16_t typeTag, app::hasMemoryAttachment *att)
fill in hasMemoryAttachment so it holds the desired memory and the incoming attachment can be holden ...
Definition: Domain.hpp:157
Definition: Client.hpp:288
size_t ipcSubscribingPartyCount(uint16_t tag) const
how many nodes on local machine have the message tag marked as subscribed excluding Nodes managed by ...
Definition: Domain.hpp:1090
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:57
Definition: Message.hpp:459
Domain(app::Config const &cfg)
Construct a new Domain object.
Definition: Domain.hpp:826
void putAtt(app::MessageWrap< app::hasMemoryAttachment > *item, size_t)
Definition: Domain.hpp:289
void allocateInShmFor0cpy(hasSharedPtrAttachment< Message, T > &att, size_t actualSize, Args &&...args)
allocate in shm for a hasSharedPtrAttachment to be publioshed later The release of it is auto handled...
Definition: Domain.hpp:1207
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:757
Definition: MetaUtils.hpp:67
Definition: Message.hpp:263
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
Definition: MetaUtils.hpp:100
template that Domain uses for the network communication properties
Definition: Domain.hpp:32
void publish(ForwardIt begin, size_t n)
publish a sequence of message of the same type through this Domain, all the Nodes in the TIPS domain ...
Definition: Domain.hpp:1177
Definition: MetaUtils.hpp:226
bool ownIpcTransport() const
if the Domain object owns the IPC transport
Definition: Domain.hpp:1228
void stop()
stop the Domain and its Message delivery functions
Definition: Domain.hpp:1236
void start(Node &node, size_t capacity=1024, time::Duration maxBlockingTime=time::Duration::seconds(1), uint64_t cpuAffinity=0)
start a Node within this Domain as a thread - handles its subscribing here too
Definition: Domain.hpp:951
void handleJustBytesCb(uint16_t tag, void const *bytes, app::hasMemoryAttachment *att)
Definition: Domain.hpp:269
size_t netRecvingPartyDetectedCount() const
how many network parties (processes) are ready to receive messages from this Domain ...
Definition: Domain.hpp:1072
void addPub()
configure the advertisement with message types directly This is when you do not want to involve a Nod...
Definition: Domain.hpp:935
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
helping allocating object and its aggregated objects in a continouse shared memory ...
Definition: Allocators.hpp:95
uint64_t clientData[2]
byte size of the above
Definition: Message.hpp:183
Definition: Message.hpp:430
internal use
Definition: Messages.hpp:42
Definition: Domain.hpp:117