1 #include "hmbdc/Copyright.hpp" 5 #include "hmbdc/app/StuckClientPurger.hpp" 6 #include "hmbdc/Config.hpp" 7 #include "hmbdc/numeric/BitMath.hpp" 8 #include "hmbdc/time/Time.hpp" 10 #include <boost/interprocess/allocators/allocator.hpp> 16 namespace hmbdc {
namespace app {
30 namespace context_property {
56 template <u
int16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
58 static_assert(max_parallel_consumer >= 4u
120 #include "hmbdc/app/ContextDetail.hpp" 121 namespace hmbdc {
namespace app {
123 namespace context_detail {
124 HMBDC_CLASS_HAS_DECLARE(hmbdc_ctx_queued_ts);
125 HMBDC_CLASS_HAS_DECLARE(hmbdcIpcFrom);
126 HMBDC_CLASS_HAS_DECLARE(ibmaProc);
141 template <
size_t MaxMessageSize,
typename... ContextProperties>
146 using Allocator =
typename cpa::Allocator;
149 MAX_MESSAGE_SIZE = MaxMessageSize,
150 BUFFER_VALUE_SIZE = MaxMessageSize +
sizeof(
MessageHead),
153 size_t maxMessageSize()
const {
154 if (MaxMessageSize == 0)
return maxMessageSizeRuntime_;
155 return MaxMessageSize;
166 template <MessageC M0, MessageC M1,
typename ... Messages,
typename Enabled
167 =
typename std::enable_if<!std::is_integral<M1>::value,
void>::type>
169 send(M0&& m0, M1&& m1, Messages&&... msgs) {
170 auto n =
sizeof...(msgs) + 2;
171 auto it = buffer_.claim(n);
172 sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
173 buffer_.commit(it, n);
186 template <MessageC M0, MessageC M1,
typename ... Messages,
typename Enabled
187 =
typename std::enable_if<!std::is_integral<M1>::value,
void>::type>
189 trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
190 auto n =
sizeof...(msgs) + 2;
191 auto it = buffer_.tryClaim(n);
193 sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
194 buffer_.commit(it, n);
209 template <MessageForwardIterC ForwardIt>
211 send(ForwardIt begin,
size_t n) {
212 if (hmbdc_likely(n)) {
213 auto bit = buffer_.claim(n);
215 for (
auto i = 0ul; i < n; i++) {
216 using Message =
typename iterator_traits<ForwardIt>::value_type;
217 static_assert(std::is_trivially_destructible<Message>::value
218 ,
"cannot send message with dtor");
219 static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
220 ,
"hasMemoryAttachment Messages cannot be sent in group");
222 if constexpr (has_hmbdc_ctx_queued_ts<Message>::value) {
223 wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
226 buffer_.commit(bit, n);
238 template <MessageForwardIterC ForwardIt>
241 if (hmbdc_likely(n)) {
242 auto bit = buffer_.tryClaim(n);
243 if (hmbdc_unlikely(!bit))
return false;
245 for (
auto i = 0ul; i < n; i++) {
246 using Message =
typename iterator_traits<ForwardIt>::value_type;
247 static_assert(std::is_trivially_destructible<Message>::value
248 ,
"cannot send message with dtor");
249 static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
250 ,
"hasMemoryAttachment Messages cannot be sent in group");
252 if constexpr (has_hmbdc_ctx_queued_ts<Message>::value) {
253 wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
256 buffer_.commit(bit, n);
271 template <MessageC Message>
274 using M =
typename std::decay<Message>::type;
275 static_assert(std::is_trivially_destructible<M>::value,
"cannot send message with dtor");
276 static_assert(MAX_MESSAGE_SIZE == 0 ||
sizeof(
MessageWrap<M>) <= BUFFER_VALUE_SIZE
277 ,
"message too big");
278 if constexpr(!std::is_base_of<hasMemoryAttachment, M>::value
280 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<M>) > buffer_.maxItemSize())) {
281 HMBDC_THROW(std::out_of_range,
"message too big, typeTag=" << m.getTypeTag());
284 if constexpr (has_hmbdc_ctx_queued_ts<M>::value) {
285 auto it = buffer_.claim();
287 wrap->template get<M>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
293 if constexpr (cpa::ipc && has_hmbdcShmRefCount<M>::value) {
294 if (m.template holdShmHandle<M>()) {
295 if (0 >= m.hmbdcShmRefCount) {
298 auto it = buffer_.claim(1);
300 wrap->payload.shmConvert(*shmAttAllocator_);
301 if constexpr (has_hmbdc_ctx_queued_ts<M>::value) {
302 wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
304 buffer_.commit(it, 1);
305 m.hasMemoryAttachment::release();
309 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0
311 HMBDC_THROW(std::out_of_range,
"message too big, typeTag=" << m.getTypeTag());
314 size_t segSize = buffer_.maxItemSize() -
sizeof(
MessageHead);
315 auto n = (att.len + segSize - 1) / segSize + 1;
316 if (hmbdc_unlikely(n > buffer_.capacity())) {
317 HMBDC_THROW(std::out_of_range
318 ,
"hasMemoryAttachment message too big, typeTag=" << m.getTypeTag());
321 auto bit = buffer_.claim(n);
327 if constexpr (has_hmbdc_ctx_queued_ts<M>::value) {
328 wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
330 auto segStart = (
char*)att.attachment;
331 auto remaining = (
size_t)att.len;
335 auto bytes = std::min(segSize, (
size_t)remaining);
336 wrap->scratchpad().ipc.inbandPayloadLen = bytes;
337 memcpy(ms.seg, segStart, bytes);
341 buffer_.commit(bit, it - bit);
352 template <MessageC Message>
354 struct IteratorAdaptor {
355 typename Buffer::iterator it;
358 return wrap->payload;
360 auto& operator ++() {
364 auto operator ++(
int) {
365 auto tmp = IteratorAdaptor{it++};
370 auto it = buffer_.claim(n);
371 auto res = IteratorAdaptor{it};
385 template <
typename IteratorAdaptor>
387 buffer_.commit(itA.it);
402 template <MessageC Message>
404 using M =
typename std::decay<Message>::type;
405 static_assert(std::is_trivially_destructible<M>::value,
"cannot send message with dtor");
406 static_assert(MAX_MESSAGE_SIZE == 0 ||
sizeof(
MessageWrap<M>) <= BUFFER_VALUE_SIZE
407 ,
"message too big");
409 if constexpr(!std::is_base_of<
hasMemoryAttachment,
typename std::decay<Message>::type>::value) {
410 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<M>) > buffer_.maxItemSize())) {
411 HMBDC_THROW(std::out_of_range,
"message too big");
416 HMBDC_THROW(std::out_of_range,
"message too big, typeTag=" << m.getTypeTag());
419 size_t segSize = buffer_.maxItemSize() -
sizeof(
MessageHead);
420 auto n = (att.len + segSize - 1) / segSize + 1;
421 if (hmbdc_unlikely(n > buffer_.capacity())) {
422 HMBDC_THROW(std::out_of_range,
"hasMemoryAttachment message too big, typeTag=" << m.getTypeTag());
425 auto bit = buffer_.tryClaim(n);
426 if (!bit)
return false;
431 ->scratchpad().inbandUnderlyingTypeTag = m.getTypeTag();
432 auto segStart = (
char*)att.attachment;
433 auto remaining = (
size_t)att.len;
437 auto bytes = std::min(segSize, (
size_t)remaining);
438 wrap->scratchpad().ipc.inbandPayloadLen = bytes;
439 memcpy(ms.seg, segStart, bytes);
443 buffer_.commit(bit, it - bit);
458 template <MessageC Message,
typename ... Args>
460 static_assert(!std::is_base_of<JustBytes, Message>::value
461 ,
"use sendJustBytesInPlace");
462 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
464 ,
"message too big");
465 static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
466 ,
"hasMemoryAttachment Messages cannot be sent in place");
467 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<Message>) > buffer_.maxItemSize())) {
468 HMBDC_THROW(std::out_of_range
469 ,
"message too big buffer_.maxItemSize()=" << buffer_.maxItemSize());
471 buffer_.template putInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
474 template <
typename JustBytesType,
typename ... Args>
475 void sendJustBytesInPlace(uint16_t tag,
void const* bytes,
size_t len
477 if (hmbdc_unlikely(len > maxMessageSize())) {
478 HMBDC_THROW(std::out_of_range,
"message too big, typeTag=" << tag
481 if (!att || !cpa::ipc) {
482 auto it = buffer_.claim();
484 , std::forward<Args>(args)...);
487 size_t segSize = buffer_.maxItemSize() -
sizeof(MessageHead);
488 auto n = (att->len + segSize - 1) / segSize + 1;
489 if (hmbdc_unlikely(n > buffer_.capacity())) {
490 HMBDC_THROW(std::out_of_range
491 ,
"hasMemoryAttachment message too big, typeTag=" << tag);
494 auto bit = buffer_.claim(n);
496 auto wrap =
new (*it++) MessageWrap<InBandHasMemoryAttachment<JustBytesType>>(
497 tag, bytes, len, att, std::forward<Args>(args)...); (void)wrap;
500 auto segStart = (
char*)att->attachment;
501 auto remaining = (
size_t)att->len;
503 auto wrap = (
new (*it++) MessageWrap<InBandMemorySeg>());
504 auto& ms = wrap->get<InBandMemorySeg>();
505 auto bytes = std::min(segSize, (
size_t)remaining);
506 wrap->scratchpad().ipc.inbandPayloadLen = bytes;
507 memcpy(ms.seg, segStart, bytes);
511 buffer_.commit(bit, it - bit);
528 template <MessageC Message,
typename ... Args>
530 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
532 ,
"message too big");
533 static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value,
"hasMemoryAttachment Messages cannot be sent in place");
534 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<Message>) > buffer_.maxItemSize())) {
535 HMBDC_THROW(std::out_of_range,
"message too big");
537 return buffer_.template tryPutInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
548 size_t dispatchingStartedCount()
const {
549 __atomic_thread_fence(__ATOMIC_ACQUIRE);
550 return *pDispStartCount_;
553 template <
typename T,
typename ...Args>
554 std::shared_ptr<T> allocateInShm(
size_t actualSize, Args&& ...args) {
555 static_assert(std::is_trivially_destructible<T>::value);
556 auto ptr = shmAttAllocator_->allocate(actualSize);
557 auto ptrT =
new (ptr) T{std::forward<Args>(args)...};
559 return std::shared_ptr<T>(
562 if (0 == __atomic_sub_fetch(&t->hmbdc0cpyShmRefCount, 1, __ATOMIC_RELEASE)) {
563 shmAttAllocator_->deallocate((uint8_t*)t);
569 template <
typename BoolLvOrRv>
570 ThreadCommBase(uint32_t messageQueueSizePower2Num
571 ,
size_t maxMessageSizeRuntime
572 ,
char const* shmName
574 , BoolLvOrRv&& tryOwn
575 ,
size_t ipcShmForAttPoolSize)
578 , Buffer::footprint(maxMessageSizeRuntime + sizeof(MessageHead)
579 , messageQueueSizePower2Num) + SMP_CACHE_BYTES + sizeof(*pDispStartCount_)
581 , pDispStartCount_(allocator_.template allocate<size_t>(SMP_CACHE_BYTES, 0))
582 , bufferptr_(allocator_.template allocate<Buffer>(SMP_CACHE_BYTES
583 , maxMessageSizeRuntime + sizeof(MessageHead), messageQueueSizePower2Num
586 , buffer_(*bufferptr_) {
587 if (messageQueueSizePower2Num < 2) {
588 HMBDC_THROW(std::out_of_range
589 ,
"messageQueueSizePower2Num need >= 2");
591 if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
592 HMBDC_THROW(std::out_of_range
593 ,
"can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
595 maxMessageSizeRuntime_ = maxMessageSizeRuntime;
597 if (((cpa::ipc && tryOwn) || !cpa::ipc) && cpa::has_pool) {
598 markDeadFrom(buffer_, 0);
601 if (cpa::ipc && ipcShmForAttPoolSize) {
602 using namespace boost::interprocess;
603 auto name = std::string(shmName) +
"-att-pool";
605 shm_unlink(name.c_str());
606 shmAttAllocator_.emplace(
607 tryOwn, create_only, name.c_str(), ipcShmForAttPoolSize);
609 shmAttAllocator_.emplace(tryOwn, open_only, name.c_str());
618 allocator_.unallocate(bufferptr_);
622 void markDeadFrom(pattern::MonoLockFreeBuffer& buffer, uint16_t) {
626 template <
typename BroadCastBuf>
628 void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
629 for (uint16_t i = poolThreadCount;
630 i < BroadCastBuf::max_parallel_consumer;
638 void markDead(pattern::MonoLockFreeBuffer& buffer, std::list<uint16_t>slots) {
642 template <
typename BroadCastBuf>
644 void markDead(BroadCastBuf& buffer, std::list<uint16_t>slots) {
645 for (
auto s : slots) {
650 Allocator allocator_;
651 size_t* pDispStartCount_;
652 Buffer* HMBDC_RESTRICT bufferptr_;
653 Buffer& HMBDC_RESTRICT buffer_;
656 template <
typename Arg,
typename ...Args>
657 ShmAttAllocator(
bool own, Arg&& arg,
char const* name, Args&& ... args)
658 : managedShm_(std::forward<Arg>(arg), name, std::forward<Args>(args)...) {
665 if (nameUnlink_.size()) {
666 shm_unlink(nameUnlink_.c_str());
670 boost::interprocess::managed_shared_memory::handle_t
671 getHandle(
void* localAddr)
const {
672 return managedShm_.get_handle_from_address(localAddr);
676 getAddr(boost::interprocess::managed_shared_memory::handle_t h)
const {
677 return (uint8_t*)managedShm_.get_address_from_handle(h);
680 uint8_t* allocate(
size_t len) {
681 auto res = (uint8_t*)managedShm_.allocate(len);
686 auto deallocate(uint8_t* p) {
688 return managedShm_.deallocate(p);
692 boost::interprocess::managed_shared_memory managedShm_;
693 std::string nameUnlink_;
695 std::optional<ShmAttAllocator> shmAttAllocator_;
699 template <
typename M,
typename... Messages>
700 void sendRecursive(
typename Buffer::iterator it
701 , M&& msg, Messages&&... msgs) {
702 using Message =
typename std::decay<M>::type;
703 static_assert(std::is_trivially_destructible<Message>::value
704 ,
"cannot send message with dtor");
706 ,
"message too big");
707 static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
708 ,
"hasMemoryAttachment Messages cannot be sent in group");
709 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0
711 HMBDC_THROW(std::out_of_range,
"message too big");
714 if constexpr (has_hmbdc_ctx_queued_ts<Message>::value) {
715 wrap->template get<Message>().hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
717 sendRecursive(++it, std::forward<Messages>(msgs)...);
719 void sendRecursive(
typename Buffer::iterator) {}
721 size_t maxMessageSizeRuntime_;
756 template <
size_t MaxMessageSize = 0,
typename... ContextProperties>
760 using Buffer =
typename Base::Buffer;
762 using Pool =
typename std::conditional<cpa::pool_msgless
776 Context(uint32_t messageQueueSizePower2Num = MaxMessageSize?20:2
777 ,
size_t maxPoolClientCount = MaxMessageSize?128:0
778 ,
size_t maxMessageSizeRuntime = MaxMessageSize)
779 :
Base(messageQueueSizePower2Num < 2?2:messageQueueSizePower2Num
780 , MaxMessageSize?MaxMessageSize:maxMessageSizeRuntime
781 , nullptr, 0, false, 0)
782 , usedHmbdcCapacity_(0)
784 , pool_(createPool<cpa>(maxPoolClientCount))
785 , poolThreadCount_(0) {
786 static_assert(!cpa::ipc,
"no name specified for ipc Context");
810 template <
typename BoolRvOrLv
811 , std::enable_if_t<std::is_same<bool, typename std::decay<BoolRvOrLv>::type>::value>*
815 ,
char const* ipcTransportName
816 , uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
817 ,
size_t maxPoolClientCount = MaxMessageSize?128:0
818 ,
size_t maxMessageSizeRuntime = MaxMessageSize
819 , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful
820 ,
size_t ipcTransportDeviceOffset = 0
821 ,
size_t ipcShmForAttPoolSize = 0)
822 :
Base(messageQueueSizePower2Num
823 , MaxMessageSize?MaxMessageSize:maxMessageSizeRuntime
825 , ipcTransportDeviceOffset, tryOwn, ipcShmForAttPoolSize)
826 , usedHmbdcCapacity_(0)
828 , pool_(createPool<cpa>(maxPoolClientCount))
829 , poolThreadCount_(0)
830 , secondsBetweenPurge_(60)
831 , ifIpcCreator_(tryOwn)
832 , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
833 static_assert(cpa::ipc,
"ctor can only be used with ipc turned on Context");
843 Base::markDeadFrom(this->buffer_, 0);
864 template <
typename Client>
866 , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
867 static_assert(cpa::has_pool,
"pool is not support in the Context type");
868 if (std::is_base_of<single_thread_powered_client, Client>::value
869 && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
870 && poolThreadCount_ != 1) {
871 HMBDC_THROW(std::out_of_range
872 ,
"cannot add a single thread powered client to the non-single" 873 "thread powered pool without specifying a single thread poolThreadAffinity" 876 primeForShmAtt(client);
878 pool_->addConsumer(*stub, poolThreadAffinityIn);
897 template <
typename Client,
typename ... Args>
899 , uint64_t poolThreadAffinityIn, Args&& ...args) {
900 addToPool(client, poolThreadAffinityIn);
901 addToPool(std::forward<Args>(args)...);
916 template <
typename Client,
typename Client2,
typename ... Args,
typename Enabled
917 =
typename std::enable_if<!std::is_integral<Client2>::value,
void>::type>
921 addToPool(client2, std::forward<Args>(args)...);
930 static_assert(cpa::has_pool,
"pool is not support in the Context type");
931 return pool_->consumerSize();
941 return this->buffer_.parallelConsumerAlive();
975 template <
typename ...Args>
978 startWithContextProperty<cpa>(
true, std::forward<Args>(args) ...);
989 template <
typename ...Args>
992 startWithContextProperty<cpa>(
false, std::forward<Args>(args) ...);
1002 stopWithContextProperty<cpa>();
1011 joinWithContextProperty<cpa>();
1027 secondsBetweenPurge_ = s;
1039 static_assert(cpa::has_pool,
"pool is not support in the Context type");
1040 pool_->runOnce(threadSerialNumberInPool);
1052 template <
typename Client>
1056 uint16_t tn = cpa::broadcast_msg?c.hmbdcNumber:0;
1059 return context_detail::runOnceImpl(tn
1060 , stopped_, this->buffer_
1065 template <
typename cpa>
1066 typename std::enable_if<cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
1067 createPool(
size_t maxPoolClientCount) {
1068 return Pool::create(this->buffer(), maxPoolClientCount);
1071 template <
typename cpa>
1072 typename std::enable_if<cpa::pool_msgless, typename Pool::ptr>::type
1073 createPool(
size_t maxPoolClientCount) {
1074 return Pool::create(maxPoolClientCount);
1077 template <
typename cpa>
1078 typename std::enable_if<!cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
1079 createPool(
size_t) {
1080 return typename Pool::ptr();
1083 template <
typename cpa>
1084 typename std::enable_if<cpa::has_pool, void>::type
1085 stopWithContextProperty() {
1086 if (pool_) pool_->stop();
1087 __atomic_thread_fence(__ATOMIC_ACQUIRE);
1091 template <
typename cpa>
1092 typename std::enable_if<!cpa::has_pool, void>::type
1093 stopWithContextProperty() {
1094 __atomic_thread_fence(__ATOMIC_ACQUIRE);
1098 template <
typename cpa>
1099 typename std::enable_if<cpa::has_pool, void>::type
1100 joinWithContextProperty() {
1101 if (pool_) pool_->join();
1102 for (
auto& t : threads_) {
1108 template <
typename cpa>
1109 typename std::enable_if<!cpa::has_pool, void>::type
1110 joinWithContextProperty() {
1111 for (
auto& t : threads_) {
1117 template <
typename cpa>
1119 reserveSlots(std::list<uint16_t>&) {
1122 template <
typename cpa,
typename ...Args>
1123 typename std::enable_if<cpa::broadcast_msg && !cpa::pool_msgless, void>::type
1124 reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
1125 auto available = this->buffer_.unusedConsumerIndexes();
1126 if (available.size() < poolThreadCount) {
1127 HMBDC_THROW(std::out_of_range
1128 ,
"Context remaining capacilty = " << available.size()
1129 <<
", consider increasing max_parallel_consumer");
1131 for (uint16_t i = 0; i < poolThreadCount; ++i) {
1132 slots.push_back(available[i]);
1133 this->buffer_.reset(available[i]);
1135 reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
1138 template <
typename cpa,
typename ...Args>
1139 typename std::enable_if<!cpa::broadcast_msg || cpa::pool_msgless, void>::type
1140 reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
1141 reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
1144 template <
typename cpa,
typename CcClient,
typename ...Args>
1145 typename std::enable_if<cpa::broadcast_msg && !std::is_integral<CcClient>::value,
void>::type
1146 reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
1147 const bool clientParticipateInMessaging =
1148 std::decay<CcClient>::type::INTERESTS_SIZE != 0;
1149 if (clientParticipateInMessaging) {
1150 auto available = this->buffer_.unusedConsumerIndexes();
1151 if (!available.size()) {
1152 HMBDC_THROW(std::out_of_range
1153 ,
"Context reached capacity, consider increasing max_parallel_consumer");
1155 this->buffer_.reset(available[0]);
1156 slots.push_back(available[0]);
1158 reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
1161 template <
typename cpa,
typename CcClient,
typename ...Args>
1162 typename std::enable_if<!cpa::broadcast_msg && !std::is_integral<CcClient>::value,
void>::type
1163 reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
1166 template <
typename cpa,
typename ...Args>
1167 typename std::enable_if<cpa::ipc, void>::type
1168 startWithContextProperty(
bool kickoffThread, Args&& ... args) {
1169 auto& lock = this->allocator_.fileLock();
1170 std::lock_guard<decltype(lock)> g(lock);
1171 std::list<uint16_t> slots;
1173 reserveSlots<cpa>(slots, args ...);
1175 startWithContextPropertyImpl<cpa>(kickoffThread, sc, std::forward<Args>(args) ...);
1176 }
catch (std::out_of_range
const&) {
1177 Base::markDead(this->buffer_, slots);
1182 template <
typename cpa,
typename ...Args>
1183 typename std::enable_if<!cpa::ipc, void>::type
1184 startWithContextProperty(
bool kickoffThread, Args&& ... args) {
1185 std::list<uint16_t> slots;
1187 reserveSlots<cpa>(slots, args ...);
1189 startWithContextPropertyImpl<cpa>(kickoffThread, sc, std::forward<Args>(args) ...);
1190 }
catch (std::out_of_range
const&) {
1191 Base::markDead(this->buffer_, slots);
1196 template <
typename cpa>
1197 typename std::enable_if<cpa::broadcast_msg && cpa::ipc, void>::type
1198 startWithContextPropertyImpl(
bool kickoffThread, std::list<uint16_t>& slots) {
1199 if (ifIpcCreator_ && !purger_ && secondsBetweenPurge_) {
1201 new StuckClientPurger<Buffer>(secondsBetweenPurge_, this->buffer_));
1202 startWithContextPropertyImpl<cpa>(kickoffThread, slots, *purger_, purgerCpuAffinityMask_);
1206 template <
typename cpa>
1207 typename std::enable_if<!cpa::broadcast_msg || !cpa::ipc, void>::type
1208 startWithContextPropertyImpl(
bool kickoffThread, std::list<uint16_t>& slots) {
1211 template <
typename cpa,
typename ...Args>
1212 typename std::enable_if<cpa::has_pool, void>::type
1213 startWithContextPropertyImpl(
bool kickoffThread, std::list<uint16_t>& slots
1214 , uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
1215 , Args&& ... args) {
1216 using namespace std;
1217 if (poolThreadCount_) {
1218 HMBDC_THROW(std::out_of_range,
"Context pool already started");
1220 std::vector<uint16_t> sc(slots.begin(), slots.end());
1221 if (!poolThreadsCpuAffinityMask) {
1222 auto cpuCount = std::thread::hardware_concurrency();
1223 poolThreadsCpuAffinityMask =
1224 ((1ul << poolThreadCount) - 1u) << (hmbdcNumbers_.size() % cpuCount);
1227 pool_->startAt(poolThreadCount, poolThreadsCpuAffinityMask, sc);
1228 while(poolThreadCount--) {
1229 if (!cpa::pool_msgless) {
1230 hmbdcNumbers_.push_back(*slots.begin());
1234 poolThreadCount_ = poolThreadCount;
1235 startWithContextPropertyImpl<cpa>(kickoffThread, slots, std::forward<Args>(args) ...);
1238 template <
typename cpa,
typename Client,
typename ...Args>
1239 typename std::enable_if<!std::is_integral<Client>::value,
void>::type
1240 startWithContextPropertyImpl(
bool kickoffThread, std::list<uint16_t>& slots
1241 , Client& c, uint64_t cpuAffinity
1242 , Args&& ... args) {
1243 auto clientParticipateInMessaging =
1244 std::decay<Client>::type::INTERESTS_SIZE;
1245 uint16_t hmbdcNumber = 0xffffu;
1246 if (clientParticipateInMessaging && cpa::broadcast_msg) {
1247 hmbdcNumber = *slots.begin();
1248 c.hmbdcNumber = hmbdcNumber;
1251 if (kickoffThread) {
1252 auto thrd = kickOffClientThread(
1253 c, cpuAffinity, hmbdcNumber, hmbdcNumbers_.size());
1254 threads_.push_back(move(thrd));
1256 hmbdcNumbers_.push_back(hmbdcNumber);
1257 startWithContextPropertyImpl<cpa>(kickoffThread, slots, std::forward<Args>(args) ...);
1260 template <
typename Client>
1262 Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
1269 , threadSerialNumber
1271 auto hmbdcNumber = h;
1273 char const* schedule;
1275 auto clientParticipateInMessaging =
1276 std::decay<Client>::type::INTERESTS_SIZE;
1282 if (clientParticipateInMessaging) {
1283 name =
"hmbdc" + std::to_string(hmbdcNumber);
1289 auto cpuAffinityMask = mask;
1290 std::tie(schedule, priority) = c.
schedSpec();
1292 if (!schedule) schedule =
"SCHED_OTHER";
1295 auto cpuCount = std::thread::hardware_concurrency();
1296 cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
1299 hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
1300 , schedule, priority);
1302 hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
1303 __atomic_add_fetch(this->pDispStartCount_, 1, __ATOMIC_RELEASE);
1305 }
catch (std::exception
const& e) {
1308 }
catch (
int code) {
1317 context_detail::runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
1319 if (this->stopped_) {
1320 if (clientParticipateInMessaging) {
1321 typename Buffer::iterator begin, end;
1325 if constexpr (cpa::broadcast_msg) {
1326 count = this->buffer_.peek(hmbdcNumber, begin, end);
1327 this->buffer_.wasteAfterPeek(hmbdcNumber, count);
1329 count = this->buffer_.peek(begin, end);
1330 this->buffer_.wasteAfterPeek(begin, count);
1336 if (clientParticipateInMessaging) context_detail::unblock(this->buffer_, hmbdcNumber);
1343 template <
typename Client>
1344 void primeForShmAtt(
Client& c) {
1345 if constexpr (cpa::ipc && context_detail::has_ibmaProc<Client>::value) {
1346 if constexpr(!std::is_same<std::nullptr_t, decltype(c.ibmaProc)>::value) {
1347 if (!c.ibmaProc.hmbdcShmHandleToAddr) {
1348 c.ibmaProc.hmbdcShmHandleToAddr = [&alloc = this->shmAttAllocator_]
1349 (boost::interprocess::managed_shared_memory::handle_t h) {
1350 return alloc->getAddr(h);
1352 c.ibmaProc.hmbdcShmDeallocator = [&alloc = this->shmAttAllocator_]
1354 return alloc->deallocate(addr);
1361 Context(Context
const&) =
delete;
1362 Context& operator = (Context
const&) =
delete;
1363 uint16_t usedHmbdcCapacity_;
1364 std::vector<uint16_t> hmbdcNumbers_;
1367 typename Pool::ptr pool_;
1368 using Threads = std::vector<std::thread>;
1370 size_t poolThreadCount_;
1371 uint32_t secondsBetweenPurge_;
1372 bool const ifIpcCreator_ =
false;
1373 uint64_t purgerCpuAffinityMask_;
1374 typename std::conditional<cpa::broadcast_msg && cpa::ipc
1375 , std::unique_ptr<StuckClientPurger<Buffer>>, uint32_t
Definition: ContextDetail.hpp:68
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:137
auto kickOffClientThread(Client &c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber)
Definition: Context.hpp:1261
Context template parameter indicating the Context must contain a pool to run Clients and the Clients ...
Definition: Context.hpp:88
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:1001
covers the inter-thread and ipc communication fascade
Definition: Context.hpp:142
void addToPool(Client &client, Client2 &client2, Args &&...args)
add a bunch of clients to Context's pool - the Clients are run in pool mode
Definition: Context.hpp:919
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:1010
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:77
Definition: TypedString.hpp:84
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:147
Definition: PoolMinus.hpp:9
void runOnce(uint16_t threadSerialNumberInPool)
normally not used until you want to run your own message loop
Definition: Context.hpp:1038
bool runOnce(Client &&c)
normally not used until you want to run your own message loop
Definition: Context.hpp:1053
Context template parameter indicating the Context is ipc enabled and it can create or be attached to ...
Definition: Context.hpp:108
void send(Message &&m)
send a message including hasMortAttachment message to the Context or attached ipc Contexts ...
Definition: Context.hpp:273
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingBuffer.hpp:11
Definition: Context.hpp:655
void stopped(std::exception const &e) noexcept
the following are for internal use, don't change or override
Definition: Client.hpp:248
Context(uint32_t messageQueueSizePower2Num=MaxMessageSize?20:2, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize)
ctor for construct local non-ipc Context
Definition: Context.hpp:776
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:240
Definition: ContextDetail.hpp:26
Definition: Message.hpp:212
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:211
when processes are distributed on a PCIe board and host PC, add this property
Definition: Context.hpp:116
void commitForSend(IteratorAdaptor itA)
commit all the filled up buffers allocated by allocateForSend and send them out
Definition: Context.hpp:386
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:57
Definition: Message.hpp:459
bool trySend(M0 &&m0, M1 &&m1, Messages &&... msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:189
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context's pool - the Client is run in pool mode
Definition: Context.hpp:865
Definition: LockFreeBufferT.hpp:18
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:940
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:929
Context(BoolRvOrLv &&tryOwn, char const *ipcTransportName, uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, uint64_t purgerCpuAffinityMask=0xfffffffffffffffful, size_t ipcTransportDeviceOffset=0, size_t ipcShmForAttPoolSize=0)
ctor for construct local ipc Context
Definition: Context.hpp:814
auto allocateForSend(size_t n)
preallocate consecutive buffers so they can be send out later
Definition: Context.hpp:353
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:757
~Context()
dtor
Definition: Context.hpp:841
void registerToRun(Args &&... args)
similarly to the start call for the usage and parameters except the Client thread is not started by t...
Definition: Context.hpp:991
Definition: Message.hpp:263
Definition: BitMath.hpp:9
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:544
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context's pool - the Clients are run in pool mode
Definition: Context.hpp:898
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
void send(M0 &&m0, M1 &&m1, Messages &&... msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:169
Exception that just has an exit code.
Definition: Exception.hpp:28
virtual void messageDispatchingStartedCb(size_t const *pClientDispatchingStarted)
called before any messages got dispatched - only once
Definition: Client.hpp:179
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:1026
void start(Args &&... args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:977
bool trySendInPlace(Args &&... args)
try send a message to all Clients in the Context or attached ipc Contexts if it wouldn't block ...
Definition: Context.hpp:529
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
void sendInPlace(Args &&... args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:459
bool trySend(Message &&m)
try to send a message including hasMortAttachment message to the Context or attached ipc Contexts if ...
Definition: Context.hpp:403
Definition: Message.hpp:430