1 #include "hmbdc/Copyright.hpp" 5 #include "hmbdc/app/StuckClientPurger.hpp" 6 #include "hmbdc/app/RequestReply.hpp" 7 #include "hmbdc/Config.hpp" 8 #include "hmbdc/numeric/BitMath.hpp" 9 #include "hmbdc/time/Time.hpp" 16 namespace hmbdc {
namespace app {
30 namespace context_property {
56 template <u
int16_t max_parallel_consumer = DEFAULT_HMBDC_CAPACITY>
58 static_assert(max_parallel_consumer >= 4u
128 #include "hmbdc/app/ContextDetail.hpp" 129 namespace hmbdc {
namespace app {
131 namespace context_detail {
145 template <
size_t MaxMessageSize,
typename... ContextProperties>
153 MAX_MESSAGE_SIZE = MaxMessageSize,
154 BUFFER_VALUE_SIZE = MaxMessageSize + 8u,
157 size_t maxMessageSize()
const {
158 if (MaxMessageSize == 0)
return maxMessageSizeRuntime_;
159 return MaxMessageSize;
170 template <MessageC M0, MessageC M1,
typename ... Messages,
typename Enabled
171 =
typename std::enable_if<!std::is_integral<M1>::value,
void>::type>
173 send(M0&& m0, M1&& m1, Messages&&... msgs) {
174 auto n =
sizeof...(msgs) + 2;
175 auto it = buffer_.claim(n);
176 sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
177 buffer_.commit(it, n);
190 template <MessageC M0, MessageC M1,
typename ... Messages,
typename Enabled
191 =
typename std::enable_if<!std::is_integral<M1>::value,
void>::type>
193 trySend(M0&& m0, M1&& m1, Messages&&... msgs) {
194 auto n =
sizeof...(msgs) + 2;
195 auto it = buffer_.tryClaim(n);
197 sendRecursive(it, std::forward<M0>(m0), std::forward<M1>(m1), std::forward<Messages>(msgs)...);
198 buffer_.commit(it, n);
213 template <MessageForwardIterC ForwardIt>
215 send(ForwardIt begin,
size_t n) {
216 if (hmbdc_likely(n)) {
217 auto bit = buffer_.claim(n);
219 for (
auto i = 0ul; i < n; i++) {
220 using Message =
typename iterator_traits<ForwardIt>::value_type;
221 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
224 buffer_.commit(bit, n);
236 template <MessageForwardIterC ForwardIt>
239 if (hmbdc_likely(n)) {
240 auto bit = buffer_.tryClaim(n);
241 if (hmbdc_unlikely(!bit))
return false;
243 for (
auto i = 0ul; i < n; i++) {
244 using Message =
typename iterator_traits<ForwardIt>::value_type;
245 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
248 buffer_.commit(bit, n);
261 template <MessageC Message>
263 using M =
typename std::decay<Message>::type;
264 static_assert(std::is_trivially_destructible<M>::value,
"cannot send message with dtor");
265 static_assert(MAX_MESSAGE_SIZE == 0 ||
sizeof(
MessageWrap<M>) <= BUFFER_VALUE_SIZE
266 ,
"message too big");
267 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<M>) > buffer_.maxItemSize())) {
268 HMBDC_THROW(std::out_of_range,
"message too big");
282 template <MessageC Message>
284 using M =
typename std::decay<Message>::type;
285 static_assert(std::is_trivially_destructible<M>::value,
"cannot send message with dtor");
286 static_assert(MAX_MESSAGE_SIZE == 0 ||
sizeof(
MessageWrap<M>) <= BUFFER_VALUE_SIZE
287 ,
"message too big");
288 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<M>) > buffer_.maxItemSize())) {
289 HMBDC_THROW(std::out_of_range,
"message too big");
303 template <MessageC Message,
typename ... Args>
305 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
307 ,
"message too big");
308 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<Message>) > buffer_.maxItemSize())) {
309 HMBDC_THROW(std::out_of_range,
"message too big");
311 buffer_.template putInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
326 template <MessageC Message,
typename ... Args>
328 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
330 ,
"message too big");
331 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<Message>) > buffer_.maxItemSize())) {
332 HMBDC_THROW(std::out_of_range,
"message too big");
334 return buffer_.template tryPutInPlace<MessageWrap<Message>>(std::forward<Args>(args)...);
348 ,
size_t maxMessageSizeRuntime
349 ,
char const* shmName
353 , Buffer::footprint(maxMessageSizeRuntime + 8u
354 , messageQueueSizePower2Num)
355 , O_RDWR | O_SYNC | (cpa::create_ipc?O_CREAT:0)
357 , bufferptr_(allocator_.template allocate<Buffer>(SMP_CACHE_BYTES
358 , maxMessageSizeRuntime + 8u, messageQueueSizePower2Num
361 , buffer_(*bufferptr_) {
362 if (messageQueueSizePower2Num < 2) {
363 HMBDC_THROW(std::out_of_range
364 ,
"messageQueueSizePower2Num need >= 2");
366 if (MaxMessageSize && maxMessageSizeRuntime != MAX_MESSAGE_SIZE) {
367 HMBDC_THROW(std::out_of_range
368 ,
"can only set maxMessageSizeRuntime when template value MaxMessageSize is 0");
370 maxMessageSizeRuntime_ = maxMessageSizeRuntime;
371 primeBuffer<(cpa::create_ipc || (!cpa::create_ipc && !cpa::attach_ipc)) && cpa::has_pool>();
372 if (cpa::create_ipc || cpa::attach_ipc) {
378 allocator_.unallocate(bufferptr_);
386 template <
typename BroadCastBuf>
388 void markDeadFrom(BroadCastBuf& buffer, uint16_t poolThreadCount) {
389 for (uint16_t i = poolThreadCount;
390 i < BroadCastBuf::max_parallel_consumer;
402 template <
typename BroadCastBuf>
404 void markDead(BroadCastBuf& buffer, std::list<uint16_t>slots) {
405 for (
auto s : slots) {
410 Allocator allocator_;
411 Buffer* HMBDC_RESTRICT bufferptr_;
412 Buffer& HMBDC_RESTRICT buffer_;
416 typename std::enable_if<doIt, void>::type
418 markDeadFrom(buffer_, 0);
422 typename std::enable_if<!doIt, void>::type
426 template <
typename M,
typename... Messages>
427 void sendRecursive(
typename Buffer::iterator it
428 , M&& msg, Messages&&... msgs) {
429 using Message =
typename std::decay<M>::type;
430 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
432 ,
"message too big");
433 if (hmbdc_unlikely(MAX_MESSAGE_SIZE == 0 &&
sizeof(
MessageWrap<Message>) > buffer_.maxItemSize())) {
434 HMBDC_THROW(std::out_of_range,
"message too big");
437 sendRecursive(++it, std::forward<M>(msgs)...);
439 void sendRecursive(
typename Buffer::iterator) {}
441 size_t maxMessageSizeRuntime_;
476 template <
size_t MaxMessageSize = 0,
typename... ContextProperties>
480 using Buffer =
typename Base::Buffer;
482 using Pool =
typename std::conditional<cpa::pool_msgless
495 Context(uint32_t messageQueueSizePower2Num = MaxMessageSize?20:2
496 ,
size_t maxPoolClientCount = MaxMessageSize?128:0
497 ,
size_t maxMessageSizeRuntime = MaxMessageSize
498 ,
size_t maxThreadSerialNumber = 64)
499 :
Base(messageQueueSizePower2Num < 2?2:messageQueueSizePower2Num
500 , maxMessageSizeRuntime, nullptr, 0)
501 , usedHmbdcCapacity_(0)
503 , pool_(createPool<
cpa>(maxPoolClientCount))
504 , poolThreadCount_(0) {
505 static_assert(!cpa::create_ipc && !cpa::attach_ipc
506 ,
"no name specified for ipc Context");
527 , uint32_t messageQueueSizePower2Num = MaxMessageSize?20:0
528 ,
size_t maxPoolClientCount = MaxMessageSize?128:0
529 ,
size_t maxMessageSizeRuntime = MaxMessageSize
530 , uint64_t purgerCpuAffinityMask = 0xfffffffffffffffful
531 ,
size_t maxThreadSerialNumber = 64
532 ,
size_t ipcTransportDeviceOffset = 0)
533 :
Base(messageQueueSizePower2Num, maxMessageSizeRuntime, ipcTransportName, ipcTransportDeviceOffset)
534 , usedHmbdcCapacity_(0)
536 , pool_(createPool<
cpa>(maxPoolClientCount))
537 , poolThreadCount_(0)
538 , secondsBetweenPurge_(60)
539 , purgerCpuAffinityMask_(purgerCpuAffinityMask) {
540 static_assert(cpa::create_ipc || cpa::attach_ipc
541 ,
"ctor can only be used with ipc turned on Context");
542 static_assert(!(cpa::create_ipc && cpa::attach_ipc)
543 ,
"Context cannot be both ipc_creator and ipc_attacher");
552 if (cpa::create_ipc) {
553 Base::markDeadFrom(this->buffer_, 0);
574 template <
typename Client>
576 , uint64_t poolThreadAffinityIn = 0xfffffffffffffffful) {
577 static_assert(cpa::has_pool,
"pool is not support in the Context type");
578 static_assert(std::tuple_size<typename Client::Replies>::value == 0
579 ,
"cannot add Client that blocks into the pool");
580 if (std::is_base_of<single_thread_powered_client, Client>::value
581 && hmbdc::numeric::setBitsCount(poolThreadAffinityIn) != 1
582 && poolThreadCount_ != 1) {
583 HMBDC_THROW(std::out_of_range
584 ,
"cannot add a single thread powered client to the non-single" 585 "thread powered pool without specifying a single thread poolThreadAffinity" 589 pool_->addConsumer(*stub, poolThreadAffinityIn);
608 template <
typename Client,
typename ... Args>
610 , uint64_t poolThreadAffinityIn, Args&& ...args) {
611 addToPool(client, poolThreadAffinityIn);
612 addToPool(std::forward<Args>(args)...);
627 template <
typename Client,
typename Client2,
typename ... Args,
typename Enabled
628 =
typename std::enable_if<!std::is_integral<Client2>::value,
void>::type>
632 addToPool(client2, std::forward<Args>(args)...);
641 static_assert(cpa::has_pool,
"pool is not support in the Context type");
642 return pool_->consumerSize();
652 return this->buffer_.parallelConsumerAlive();
686 template <
typename ...Args>
689 startWithContextProperty<cpa>(std::forward<Args>(args) ...);
699 stopWithContextProperty<cpa>();
708 joinWithContextProperty<cpa>();
724 secondsBetweenPurge_ = s;
736 static_assert(cpa::has_pool,
"pool is not support in the Context type");
737 pool_->runOnce(threadSerialNumberInPool);
747 template <
typename Client>
751 uint16_t tn = cpa::broadcast_msg?hmbdcNumbers_[threadSerialNumber]:threadSerialNumber;
753 context_detail::runOnceImpl(tn
754 , stopped_, this->buffer_
786 template <RequestC Request,
typename ReplyHandler>
790 static_assert(cpa::has_pool && !cpa::pool_msgless
791 ,
"not supported - needs to have a pool");
810 template <ReplyC Reply>
813 static_assert(cpa::has_pool && !cpa::pool_msgless
814 ,
"not supported - needs to have a pool");
815 this->send(std::forward<Reply>(reply));
819 template <
typename cpa>
820 typename std::enable_if<cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
821 createPool(
size_t maxPoolClientCount) {
822 return Pool::create(this->buffer(), maxPoolClientCount);
825 template <
typename cpa>
826 typename std::enable_if<cpa::pool_msgless, typename Pool::ptr>::type
827 createPool(
size_t maxPoolClientCount) {
828 return Pool::create(maxPoolClientCount);
831 template <
typename cpa>
832 typename std::enable_if<!cpa::has_pool && !cpa::pool_msgless, typename Pool::ptr>::type
834 return typename Pool::ptr();
837 template <
typename cpa>
838 typename std::enable_if<cpa::has_pool, void>::type
839 stopWithContextProperty() {
840 if (pool_) pool_->stop();
841 __atomic_thread_fence(__ATOMIC_ACQUIRE);
845 template <
typename cpa>
846 typename std::enable_if<!cpa::has_pool, void>::type
847 stopWithContextProperty() {
848 __atomic_thread_fence(__ATOMIC_ACQUIRE);
852 template <
typename cpa>
853 typename std::enable_if<cpa::has_pool, void>::type
854 joinWithContextProperty() {
855 if (pool_) pool_->join();
856 for (
auto& t : threads_) {
862 template <
typename cpa>
863 typename std::enable_if<!cpa::has_pool, void>::type
864 joinWithContextProperty() {
865 for (
auto& t : threads_) {
871 template <
typename cpa>
873 reserveSlots(std::list<uint16_t>&) {
876 template <
typename cpa,
typename ...Args>
877 typename std::enable_if<cpa::broadcast_msg && !cpa::pool_msgless, void>::type
878 reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
879 auto available = this->buffer_.unusedConsumerIndexes();
880 if (available.size() < poolThreadCount) {
881 HMBDC_THROW(std::out_of_range
882 ,
"Context remaining capacilty = " << available.size()
883 <<
", consider increasing max_parallel_consumer");
885 for (uint16_t i = 0; i < poolThreadCount; ++i) {
886 slots.push_back(available[i]);
887 this->buffer_.reset(available[i]);
889 reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
892 template <
typename cpa,
typename ...Args>
893 typename std::enable_if<!cpa::broadcast_msg || cpa::pool_msgless, void>::type
894 reserveSlots(std::list<uint16_t>& slots, uint16_t poolThreadCount, uint64_t, Args&& ... args) {
895 reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
898 template <
typename cpa,
typename CcClient,
typename ...Args>
899 typename std::enable_if<cpa::broadcast_msg && !std::is_integral<CcClient>::value,
void>::type
900 reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
901 const bool clientParticipateInMessaging =
902 std::decay<CcClient>::type::INTERESTS_SIZE != 0;
903 if (clientParticipateInMessaging) {
904 auto available = this->buffer_.unusedConsumerIndexes();
905 if (!available.size()) {
906 HMBDC_THROW(std::out_of_range
907 ,
"Context reached capacity, consider increasing max_parallel_consumer");
909 this->buffer_.reset(available[0]);
910 slots.push_back(available[0]);
912 reserveSlots<cpa>(slots, std::forward<Args>(args) ...);
915 template <
typename cpa,
typename CcClient,
typename ...Args>
916 typename std::enable_if<!cpa::broadcast_msg && !std::is_integral<CcClient>::value,
void>::type
917 reserveSlots(std::list<uint16_t>& slots, CcClient& c, uint64_t, Args&& ... args) {
920 template <
typename cpa,
typename ...Args>
921 typename std::enable_if<cpa::create_ipc || cpa::attach_ipc, void>::type
922 startWithContextProperty(Args&& ... args) {
923 auto& lock = this->allocator_.fileLock();
924 std::lock_guard<decltype(lock)> g(lock);
925 std::list<uint16_t> slots;
927 reserveSlots<cpa>(slots, args ...);
929 startWithContextPropertyImpl<cpa>(sc, std::forward<Args>(args) ...);
930 }
catch (std::out_of_range
const&) {
931 Base::markDead(this->buffer_, slots);
936 template <
typename cpa,
typename ...Args>
937 typename std::enable_if<!cpa::create_ipc && !cpa::attach_ipc, void>::type
938 startWithContextProperty(Args&& ... args) {
939 std::list<uint16_t> slots;
941 reserveSlots<cpa>(slots, args ...);
943 startWithContextPropertyImpl<cpa>(sc, std::forward<Args>(args) ...);
944 }
catch (std::out_of_range
const&) {
945 Base::markDead(this->buffer_, slots);
950 template <
typename cpa>
951 typename std::enable_if<cpa::broadcast_msg && cpa::create_ipc, void>::type
952 startWithContextPropertyImpl(std::list<uint16_t>& slots) {
956 startWithContextPropertyImpl<cpa>(slots, *purger_, purgerCpuAffinityMask_);
960 template <
typename cpa>
961 typename std::enable_if<!cpa::broadcast_msg || !cpa::create_ipc, void>::type
962 startWithContextPropertyImpl(std::list<uint16_t>& slots) {
965 template <
typename cpa,
typename ...Args>
966 typename std::enable_if<cpa::has_pool, void>::type
967 startWithContextPropertyImpl(std::list<uint16_t>& slots
968 , uint16_t poolThreadCount, uint64_t poolThreadsCpuAffinityMask
971 if (poolThreadCount_) {
972 HMBDC_THROW(std::out_of_range,
"Context pool already started");
974 std::vector<uint16_t> sc(slots.begin(), slots.end());
975 if (!poolThreadsCpuAffinityMask) {
976 auto cpuCount = std::thread::hardware_concurrency();
977 poolThreadsCpuAffinityMask =
978 ((1ul << poolThreadCount) - 1u) << (hmbdcNumbers_.size() % cpuCount);
981 pool_->startAt(poolThreadCount, poolThreadsCpuAffinityMask, sc);
982 while(poolThreadCount--) {
983 if (!cpa::pool_msgless) {
984 hmbdcNumbers_.push_back(*slots.begin());
988 poolThreadCount_ = poolThreadCount;
989 startWithContextPropertyImpl<cpa>(slots, std::forward<Args>(args) ...);
992 template <
typename cpa,
typename Client,
typename ...Args>
993 typename std::enable_if<!std::is_integral<Client>::value,
void>::type
994 startWithContextPropertyImpl(std::list<uint16_t>& slots
995 , Client& c, uint64_t cpuAffinity
997 auto clientParticipateInMessaging =
998 std::decay<Client>::type::INTERESTS_SIZE;
999 uint16_t hmbdcNumber = 0xffffu;
1000 if (clientParticipateInMessaging && cpa::broadcast_msg) {
1001 hmbdcNumber = *slots.begin();
1004 auto thrd = kickOffClientThread(
1005 c, cpuAffinity, hmbdcNumber, hmbdcNumbers_.size());
1006 threads_.push_back(move(thrd));
1007 hmbdcNumbers_.push_back(hmbdcNumber);
1008 startWithContextPropertyImpl<cpa>(slots, std::forward<Args>(args) ...);
1011 template <
typename Client>
1012 auto kickOffClientThread(
1013 Client& c, uint64_t mask, uint16_t hmbdcNumber, uint16_t threadSerialNumber) {
1019 , threadSerialNumber
1021 auto hmbdcNumber = h;
1023 char const* schedule;
1025 auto clientParticipateInMessaging =
1026 std::decay<Client>::type::INTERESTS_SIZE;
1032 if (clientParticipateInMessaging) {
1033 name =
"hmbdc" + std::to_string(hmbdcNumber);
1039 auto cpuAffinityMask = mask;
1040 std::tie(schedule, priority) = c.
schedSpec();
1042 if (!schedule) schedule =
"SCHED_OTHER";
1045 auto cpuCount = std::thread::hardware_concurrency();
1046 cpuAffinityMask = 1ul << (threadSerialNumber % cpuCount);
1049 hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask
1050 , schedule, priority);
1052 hmbdcNumber = clientParticipateInMessaging?hmbdcNumber:0xffffu;
1055 }
catch (std::exception
const& e) {
1058 }
catch (
int code) {
1067 context_detail::runOnceImpl(hmbdcNumber, this->stopped_, this->buffer_, c)) {
1069 if (this->stopped_) c.dropped();
1070 if (clientParticipateInMessaging) context_detail::unblock(this->buffer_, hmbdcNumber);
1079 uint16_t usedHmbdcCapacity_;
1080 std::vector<uint16_t> hmbdcNumbers_;
1083 typename Pool::ptr pool_;
1084 using Threads = std::vector<std::thread>;
1086 size_t poolThreadCount_;
1087 uint32_t secondsBetweenPurge_;
1088 uint64_t purgerCpuAffinityMask_;
1089 typename std::conditional<cpa::broadcast_msg && cpa::create_ipc
1090 , std::unique_ptr<StuckClientPurger<Buffer>>, uint32_t
Definition: MonoLockFreeBuffer.hpp:15
Definition: ContextDetail.hpp:67
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:75
Definition: StuckClientPurger.hpp:11
bool requestReply(RequestT &&request, Sender &sender, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout)
not an interface exposed to end user
Definition: RequestReply.hpp:142
Context template parameter indicating the Context must contain a pool to run Clients and the Clients ...
Definition: Context.hpp:88
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:698
covers the inter-thread and ipc communication fascade
Definition: Context.hpp:146
void addToPool(Client &client, Client2 &client2, Args &&...args)
add a bunch of clients to Context's pool - the Clients are run in pool mode
Definition: Context.hpp:630
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:707
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:77
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:85
Definition: PoolMinus.hpp:9
bool requestReply(Request &&request, ReplyHandler &replyHandler, time::Duration timeout=time::Duration::seconds(0xffffffff))
part of synchronous request reply interface. The caller is blocked until the replies to this request ...
Definition: Context.hpp:788
the default vanilla allocate
Definition: Allocators.hpp:145
void send(Message &&m)
send a message to the Context or attached ipc Contexts
Definition: Context.hpp:262
Unknown excpetion.
Definition: Exception.hpp:17
void runClientThreadOnce(uint16_t threadSerialNumber, Client &&c)
normally not used until you want to run your own message loop
Definition: Context.hpp:748
Definition: BlockingBuffer.hpp:10
void sendReply(Reply &&reply)
part of synchronous request reply interface. Send a constructed REPLY to the Context, it will only reach the original sender Client's reply callbacks. Although the sender Client is blocked - their reply callbacks are invoked using a different thread. ALL REPLY need to sent out using this interface to avoid undefined behavior.
Definition: Context.hpp:812
void stopped(std::exception const &e) noexcept
the following are for internal use, don't change or override
Definition: Client.hpp:182
void runPoolThreadOnce(uint16_t threadSerialNumberInPool)
normally not used until you want to run your own message loop
Definition: Context.hpp:735
bool trySend(ForwardIt begin, size_t n)
try send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:238
Definition: ContextDetail.hpp:27
void send(ForwardIt begin, size_t n)
send a range of messages to the Context or attached ipc Contexts
Definition: Context.hpp:215
when processes are distributed on a PCIe board and host PC, add this property
Definition: Context.hpp:124
Context(char const *ipcTransportName, uint32_t messageQueueSizePower2Num=MaxMessageSize?20:0, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, uint64_t purgerCpuAffinityMask=0xfffffffffffffffful, size_t maxThreadSerialNumber=64, size_t ipcTransportDeviceOffset=0)
ctor for construct local ipc Context
Definition: Context.hpp:526
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:57
bool trySend(M0 &&m0, M1 &&m1, Messages &&... msgs)
try to send a batch of message to the Context or attached ipc Contexts
Definition: Context.hpp:193
void addToPool(Client &client, uint64_t poolThreadAffinityIn=0xfffffffffffffffful)
add a client to Context's pool - the Client is run in pool mode
Definition: Context.hpp:575
Definition: LockFreeBufferT.hpp:18
size_t parallelConsumerAlive() const
how many parallel consummers are started
Definition: Context.hpp:651
size_t clientCountInPool() const
return the numebr of clients added into pool
Definition: Context.hpp:640
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:477
~Context()
dtor
Definition: Context.hpp:551
Definition: Message.hpp:112
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:113
Definition: BitMath.hpp:9
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:341
Context template parameter indicating the Context is ipc enabled and it can be attached (see ipc_atta...
Definition: Context.hpp:103
void addToPool(Client &client, uint64_t poolThreadAffinityIn, Args &&...args)
add a bunch of clients to Context's pool - the Clients are run in pool mode
Definition: Context.hpp:609
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
void send(M0 &&m0, M1 &&m1, Messages &&... msgs)
try send a batch of messages to the Context or attached ipc Contexts
Definition: Context.hpp:173
Exception that just has an exit code.
Definition: Exception.hpp:28
void setSecondsBetweenPurge(uint32_t s)
ipc_creator Context runs a StcuClientPurger to purge crashed (or slow, stuck ...) Clients from the ip...
Definition: Context.hpp:723
Context(uint32_t messageQueueSizePower2Num=MaxMessageSize?20:2, size_t maxPoolClientCount=MaxMessageSize?128:0, size_t maxMessageSizeRuntime=MaxMessageSize, size_t maxThreadSerialNumber=64)
ctor for construct local non-ipc Context
Definition: Context.hpp:495
void start(Args &&... args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:688
bool trySendInPlace(Args &&... args)
try send a message to all Clients in the Context or attached ipc Contexts if it wouldn't block ...
Definition: Context.hpp:327
void sendInPlace(Args &&... args)
send a message to all Clients in the Context or attached ipc Contexts
Definition: Context.hpp:304
bool trySend(Message &&m)
try to send a message to the Context or attached ipc Contexts if it wouldn't block ...
Definition: Context.hpp:283
Context template parameter indicating the Context is ipc enabled and it can attach to an ipc transpor...
Definition: Context.hpp:116