1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Message.hpp" 4 #include "hmbdc/app/Logger.hpp" 5 #include "hmbdc/app/Context.hpp" 6 #include "hmbdc/time/Timers.hpp" 7 #include "hmbdc/time/Time.hpp" 8 #include "hmbdc/pattern/BlockingBuffer.hpp" 9 #include "hmbdc/os/Thread.hpp" 10 #include "hmbdc/MetaUtils.hpp" 11 #include "hmbdc/Exception.hpp" 13 #include <type_traits> 20 namespace hmbdc {
namespace app {
21 namespace blocking_context_detail {
22 HMBDC_CLASS_HAS_DECLARE(hmbdc_ctx_queued_ts);
24 template <
typename Interests>
32 using type =
typename std::conditional<
39 template <
bool is_timer_manager>
42 void operator()(C&) {}
48 tm.checkTimers(time::SysTime::now());
52 template <
typename... MessageTuples>
57 template <
typename MessageTuple,
typename... MessageTuples>
66 typename std::enable_if<std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
68 return std::min(tm.untilNextFire(), maxBlockingTime);
72 typename std::enable_if<!std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
74 return maxBlockingTime;
77 template <
typename CcClient>
78 bool runOnceImpl(
bool& HMBDC_RESTRICT stopped
83 tm_runner<std::is_base_of<time::TimerManager, CcClient>::value> tr;
86 const bool clientParticipateInMessaging =
87 std::decay<CcClient>::type::INTERESTS_SIZE != 0;
88 if (clientParticipateInMessaging && buf) {
89 uint64_t count = buf->peek(begin, end);
91 c.CcClient::invokedCb(c.CcClient::handleRangeImpl(b, end, 0xffff));
92 buf->wasteAfterPeek(count);
94 buf->waitItem(waitDuration(c, maxBlockingTime));
97 c.CcClient::invokedCb(0);
98 auto d = waitDuration(c, maxBlockingTime);
99 if (d) usleep(d.microseconds());
101 }
catch (std::exception
const& e) {
108 c.stopped(ExitCode(code));
113 c.stopped(UnknownException());
121 template <
typename CcClient>
122 bool runOnceLoadSharingImpl(
bool& HMBDC_RESTRICT stopped
123 , pattern::BlockingBuffer* HMBDC_RESTRICT buf
124 , CcClient& HMBDC_RESTRICT c, time::Duration maxBlockingTime) {
125 pattern::BlockingBuffer::iterator begin, end;
127 tm_runner<std::is_base_of<time::TimerManager, CcClient>::value> tr;
130 const bool clientParticipateInMessaging =
131 std::decay<CcClient>::type::INTERESTS_SIZE != 0;
132 if (clientParticipateInMessaging && buf) {
133 uint8_t msgWrapped[buf->maxItemSize()] __attribute__((aligned (8)));
134 void* tmp = msgWrapped;
136 if (buf->tryTake(tmp, 0, waitDuration(c, maxBlockingTime))) {
137 disp = MessageDispacher<CcClient, typename CcClient::Interests>()(
138 c, *static_cast<MessageHead*>(tmp));
140 c.CcClient::invokedCb(disp?1:0);
142 c.CcClient::invokedCb(0);
143 auto d = waitDuration(c, maxBlockingTime);
144 if (d) usleep(d.microseconds());
146 }
catch (std::exception
const& e) {
153 c.stopped(ExitCode(code));
158 c.stopped(UnknownException());
166 Transport(
size_t maxItemSize,
size_t capacity)
167 : buffer(maxItemSize +
sizeof(
MessageHead), capacity){}
171 template <
typename Message>
173 std::shared_ptr<Transport> transport;
174 std::function<bool (Message const&)> deliverPred;
175 std::function<bool (uint16_t, uint8_t const*)> deliverJustBytesPred;
180 std::shared_ptr<Transport> transport;
181 std::function<bool (uint16_t, uint8_t const*)> deliverPred;
182 std::function<bool (uint16_t, uint8_t const*)> deliverJustBytesPred;
202 template <MessageTupleC... MessageTuples>
206 template <
typename Message>
struct can_handle;
214 using Interests =
typename cpa::Interests;
217 template <
typename T>
218 bool operator()(T&&) {
return true;}
219 bool operator()(uint16_t, uint8_t
const*) {
return true;}
271 template <
typename Client,
typename DeliverPred = deliverAll>
273 ,
size_t capacity = 1024
275 , uint64_t cpuAffinity = 0
277 , DeliverPred&& pred = DeliverPred()) {
278 auto t =
registerToRun(c, capacity, maxItemSize, std::forward<DeliverPred>(pred));
280 , c, t.use_count() == 1?
nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
281 threads_.push_back(move(thrd));
286 using ClientRegisterHandle = std::shared_ptr<Transport>;
300 template <
typename Client,
typename DeliverPred = deliverAll>
302 ,
size_t capacity = 1024
304 , DeliverPred&& pred = DeliverPred()) {
306 HMBDC_THROW(std::out_of_range,
"maxItemSize is too small");
307 std::shared_ptr<Transport> t(
new Transport(maxItemSize, capacity));
311 msgConduits_, t, std::forward<DeliverPred>(pred));
325 template <
typename CcClient>
327 bool stopped =
false;
328 return runOnceImpl(stopped, &t->buffer, c, maxBlockingTime);
356 template <
typename LoadSharingClientPtrIt,
typename DeliverPred = deliverAll>
357 void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end
358 ,
size_t capacity = 1024
360 typename std::decay<decltype(**LoadSharingClientPtrIt())>::type::Interests
362 , uint64_t cpuAffinity = 0
364 , DeliverPred&& pred = DeliverPred()) {
365 using Client =
typename std::decay<
366 decltype(**LoadSharingClientPtrIt())
369 HMBDC_THROW(std::out_of_range,
"maxItemSize is too small");
370 if (begin == end)
return;
371 std::shared_ptr<Transport> t(
new Transport(maxItemSize, capacity));
376 msgConduits_, t, std::forward<DeliverPred>(pred));
377 while(begin != end) {
379 , **begin++, t.use_count() == 1?
nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
380 threads_.push_back(std::move(thrd));
390 __atomic_thread_fence(__ATOMIC_ACQUIRE);
407 for (
auto& t : threads_) {
421 template <MessageC Message>
425 using M =
typename std::decay<Message>::type;
427 static_assert(i < std::tuple_size<MsgConduits>::value,
"unexpected message type");
428 auto& entries = std::get<i>(msgConduits_);
429 for (
auto i = 0u; i < entries.size(); ++i) {
430 auto& e = entries[i];
431 if (e.deliverPred(m)) {
432 if constexpr (blocking_context_detail::has_hmbdc_ctx_queued_ts<M>::value) {
433 auto& tmp =
const_cast<M&
>(m);
434 tmp.hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
437 if (i == entries.size() - 1) {
439 .template putInPlace<MessageWrap<M>>(std::forward<Message>(m));
441 e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
446 justBytesSend(std::forward<Message>(m));
448 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 463 template <MessageC Message,
typename ... Args>
465 static_assert(!Message::justBytes,
"use sendJustBytesInPlace");
467 justBytesSendInPlace<Message>(args...);
468 using M =
typename std::decay<Message>::type;
470 auto& entries = std::get<i>(msgConduits_);
471 for (
auto i = 0u; i < entries.size(); ++i) {
472 auto& e = entries[i];
473 if (i == entries.size() - 1) {
475 .template putInPlace<MessageWrap<M>>(std::forward<Args>(args)...);
477 e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
481 justBytesSendInPlace<Message>(std::forward<Args>(args)...);
483 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 484 , std::decay<Message>::type::typeTag);)
489 void sendJustBytesInPlace(uint16_t tag,
void const* bytes,
size_t len,
hasMemoryAttachment* att) {
491 [=](
auto&&... args) {
492 (args.applyJustBytes(tag, bytes, len, att), ...) ;
510 template <MessageC Message>
513 if (!justBytesTrySend(m))
return false;
514 using M =
typename std::decay<Message>::type;
516 auto& entries = std::get<i>(msgConduits_);
517 for (
auto i = 0u; i < entries.size(); ++i) {
518 auto& e = entries[i];
519 if (e.deliverPred(m)) {
520 if constexpr (blocking_context_detail::has_hmbdc_ctx_queued_ts<M>::value) {
521 auto& tmp =
const_cast<M&
>(m);
522 tmp.hmbdc_ctx_queued_ts = hmbdc::time::SysTime::now();
524 if (i == entries.size() - 1) {
525 if (!e.transport->buffer
539 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 540 , std::decay<Message>::type::typeTag);)
543 return justBytesTrySend(std::forward<Message>(m));
560 template <MessageC Message,
typename ... Args>
563 if (!justBytesTrySendInPlace<Message>(args...))
return false;
564 using M =
typename std::decay<Message>::type;
566 auto& entries = std::get<i>(msgConduits_);
567 for (
auto i = 0u; i < entries.size(); ++i) {
568 auto& e = entries[i];
569 if (i == entries.size() - 1) {
570 if (!e.transport->buffer.template
575 if (!e.transport->buffer.template
584 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 585 , std::decay<Message>::type::typeTag);)
588 return justBytesTrySendInPlace<Message>(args...);
601 template <MessageForwardIterC ForwardIt>
602 void send(ForwardIt begin,
size_t n) {
603 if constexpr (
can_handle<decltype(*(ForwardIt()))>::match) {
605 justBytesSend(begin, n);
606 using Message = decltype(*begin);
607 using M =
typename std::decay<Message>::type;
609 auto& entries = std::get<i>(msgConduits_);
610 for (
auto i = 0u; i < entries.size(); ++i) {
611 auto& e = entries[i];
612 if (e.deliverPred(*begin)) {
613 while (!e.transport->buffer.template
618 if (!
can_handle<decltype(*(ForwardIt()))>::justbytes) {
619 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 620 , std::decay<decltype(*(ForwardIt()))>::type::typeTag);)
622 justBytesSend(begin, n);
628 template <
typename Message>
629 using TransportEntry = blocking_context_detail::TransportEntry<Message>;
631 template <
typename Message>
634 void applyJustBytes(uint16_t tag,
void const* bytes
636 if (typeTagMatch<Message>(tag)
637 && std::is_trivially_destructible<Message>::value) {
638 auto& entries = *
this;
639 for (
auto i = 0u; i < entries.size(); ++i) {
640 auto& e = entries[i];
645 if (e.deliverJustBytesPred(tag, (uint8_t
const*)bytes)) {
647 , e.transport->buffer.maxItemSize() -
sizeof(
MessageHead));
648 e.transport->buffer.template
649 putInPlace<MessageWrap<JustBytes>>(tag, bytes, len, att);
656 template <
typename Tuple>
struct MCGen;
657 template <
typename ...Messages>
659 using type = std::tuple<TransportEntries<Messages> ...>;
664 MsgConduits msgConduits_;
665 using Threads = std::vector<std::thread>;
669 template <
typename MsgConduits,
typename DeliverPred,
typename Tuple>
671 void operator()(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&){}
675 template <
bool careMsg,
size_t i,
typename M,
typename MsgConduits,
typename DeliverPred>
676 typename std::enable_if<careMsg, void>::type
677 doOrNot(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred) {
678 auto& entries = std::get<i>(msgConduits);
679 if constexpr (std::is_trivially_destructible<M>::value) {
686 template <
bool careMsg,
size_t i,
typename M,
typename MsgConduits,
typename DeliverPred>
687 typename std::enable_if<!careMsg, void>::type
688 doOrNot(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&) {
692 template <
typename MsgConduits,
typename DeliverPred,
typename M,
typename ...Messages>
694 void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred){
696 createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
697 msgConduits, t, pred);
698 setupConduit<MsgConduits, DeliverPred, std::tuple<Messages...>>()
699 (msgConduits,t, std::forward<DeliverPred>(pred));
703 template <
typename Message>
706 using M =
typename std::decay<Message>::type;
710 match = index < std::tuple_size<MsgConduits>::value,
712 justbytes = justbytes_index < std::tuple_size<MsgConduits>::value
713 && std::is_trivially_destructible<M>::value? 1 :0,
717 template <
typename Client,
typename RunOnceFunc>
720 if (buffer ==
nullptr) {
721 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"starting a Client that cannot receive messages, Client typid()=" 722 ,
typeid(
Client).name());)
732 char const* schedule;
741 auto cpuAffinityMask = mask;
742 std::tie(schedule, priority) = c.
schedSpec();
744 if (!schedule) schedule =
"SCHED_OTHER";
746 os::configureCurrentThread(name.c_str(), cpuAffinityMask
747 , schedule, priority);
748 __atomic_add_fetch(&dispStartCount_, 1, __ATOMIC_RELEASE);
750 }
catch (std::exception
const& e) {
761 while(!stopped_ && runOnceFunc(this->stopped_, buffer, c, maxBlockingTime)) {
769 count = buffer->peek(begin, end);
770 buffer->wasteAfterPeek(count);
782 template <MessageC Message>
783 void justBytesSend(Message&& m) {
784 if constexpr (can_handle<Message>::justbytes) {
785 using M =
typename std::decay<Message>::type;
786 auto constexpr i = can_handle<Message>::justbytes_index;
787 auto& entries = std::get<i>(msgConduits_);
788 for (
auto i = 0u; i < entries.size(); ++i) {
789 auto& e = entries[i];
790 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
791 if (e.deliverPred(m.getTypeTag(), (uint8_t*)&m)) {
792 e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
795 HMBDC_THROW(std::out_of_range
796 ,
"need to increase maxItemSize in start() to >= " 797 <<
sizeof(MessageWrap<M>));
803 template <MessageC Message,
typename ... Args>
804 void justBytesSendInPlace(Args&& ...args) {
805 if constexpr (can_handle<Message>::justbytes) {
806 using M =
typename std::decay<Message>::type;
807 static_assert(!std::is_same<JustBytes, M>::value,
"use sendJustBytesInPlace");
808 auto constexpr i = can_handle<Message>::justbytes_index;
809 auto& entries = std::get<i>(msgConduits_);
810 for (
auto i = 0u; i < entries.size(); ++i) {
811 auto& e = entries[i];
812 if (e.transport->buffer.maxItemSize() >=
sizeof(MessageWrap<M>)) {
813 e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
815 HMBDC_THROW(std::out_of_range
816 ,
"need to increase maxItemSize in start() to >= " 817 <<
sizeof(MessageWrap<M>));
823 template <MessageC Message>
824 bool justBytesTrySend(Message&& m, time::Duration timeout) {
825 if constexpr (can_handle<Message>::justbytes) {
826 using M =
typename std::decay<Message>::type;
827 static_assert(!std::is_same<JustBytes, M>::value,
"wrong args");
828 auto constexpr i = can_handle<Message>::justbytes_index;
829 auto& entries = std::get<i>(msgConduits_);
830 for (
auto i = 0u; i < entries.size(); ++i) {
831 auto& e = entries[i];
832 if (e.transport->buffer.maxItemSize() >=
sizeof(MessageWrap<M>)) {
833 if (e.deliverPred(m.getTypeTag(), (uint8_t*)&m)) {
834 if (!e.transport->buffer.tryPut(MessageWrap<M>(m), timeout)) {
839 HMBDC_THROW(std::out_of_range
840 ,
"need to increase maxItemSize in start() to >= " 841 <<
sizeof(MessageWrap<M>));
848 template <MessageC Message,
typename ... Args>
849 bool justBytesTrySendInPlace(Args&&... args) {
850 if constexpr (can_handle<Message>::justbytes) {
851 using M =
typename std::decay<Message>::type;
852 auto constexpr i = can_handle<Message>::justbytes_index;
853 auto& entries = std::get<i>(msgConduits_);
854 for (
auto i = 0u; i < entries.size(); ++i) {
855 auto& e = entries[i];
856 if (e.transport->buffer.maxItemSize() >=
sizeof(MessageWrap<M>)) {
857 if (!e.transport->buffer.template
858 tryPutInPlace<MessageWrap<M>>(args...)) {
862 HMBDC_THROW(std::out_of_range
863 ,
"need to increase maxItemSize in start() to >= " 864 <<
sizeof(MessageWrap<M>));
871 template <MessageForwardIterC ForwardIt>
872 void justBytesSend(ForwardIt begin,
size_t n) {
873 if constexpr (can_handle<decltype(*(ForwardIt()))>::justbytes) {
874 using Message = decltype(*begin);
875 using M =
typename std::decay<Message>::type;
876 auto constexpr i = can_handle<decltype(*(ForwardIt()))>::justbytes_index;
877 auto& entries = std::get<i>(msgConduits_);
878 for (
auto i = 0u; i < entries.size(); ++i) {
879 auto& e = entries[i];
881 if (e.transport->buffer.maxItemSize() >=
sizeof(MessageWrap<M>)) {
882 if (e.deliverPred(begin->getTypeTag(), (uint8_t*)&*begin)) {
883 while (!e.transport->buffer.template tryPutBatchInPlace<MessageWrap<M>>(begin, n)) {}
886 HMBDC_THROW(std::out_of_range
887 ,
"need to increase maxItemSize in start() to >= " 888 <<
sizeof(MessageWrap<M>));
895 size_t dispStartCount_ = 0;
Definition: BlockingBuffer.hpp:17
Definition: MetaUtils.hpp:84
void sendInPlace(Args &&... args)
send a message by directly constructing the message in receiving message queue
Definition: BlockingContext.hpp:464
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:137
Definition: BlockingContext.hpp:632
void send(ForwardIt begin, size_t n)
send a range of messages via the BlockingContext
Definition: BlockingContext.hpp:602
bool runOnce(ClientRegisterHandle &t, CcClient &c, time::Duration maxBlockingTime)
run the Client previous registered to process exsiting Messages
Definition: BlockingContext.hpp:326
typename std::conditional< index_in_tuple< JustBytes, Interests >::value==std::tuple_size< Interests >::value, Interests, typename merge_tuple_unique< std::tuple< JustBytes >, no_trivial >::type >::type type
avoid double delivery when JustBytes is involved
Definition: BlockingContext.hpp:36
bool trySendInPlace(Args &&... args)
best effort to send a message by directly constructing the message in receiving message queue ...
Definition: BlockingContext.hpp:561
Definition: TypedString.hpp:84
Definition: BlockingContext.hpp:53
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:147
Definition: BlockingContext.hpp:25
Definition: Timers.hpp:70
Definition: BlockingContext.hpp:656
Definition: MetaUtils.hpp:48
Definition: BlockingContext.hpp:165
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingContext.hpp:172
Definition: BlockingContext.hpp:206
bool trySend(Message &&m, time::Duration timeout=time::Duration::seconds(0))
best effort to send a message to via the BlockingContext
Definition: BlockingContext.hpp:511
Definition: BlockingContext.hpp:216
ClientRegisterHandle registerToRun(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, DeliverPred &&pred=DeliverPred())
same as start as to the paramater arguments, except it just register the Client with the context with...
Definition: BlockingContext.hpp:301
void stopped(std::exception const &e) noexcept
the following are for internal use, don't change or override
Definition: Client.hpp:248
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:203
Definition: Message.hpp:212
Definition: BlockingContext.hpp:674
A special type of message only used on the receiving side.
Definition: Message.hpp:341
void start(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a client within the Context. The client is powered by a single OS thread.
Definition: BlockingContext.hpp:272
Definition: MetaUtils.hpp:265
void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename std::decay< decltype(**LoadSharingClientPtrIt())>::type::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a group of clients within the Context that collectively processing messages in a load sharing m...
Definition: BlockingContext.hpp:357
void join()
wait until all threads of the Context exit
Definition: BlockingContext.hpp:406
void stop()
stop the message dispatching - asynchronously
Definition: BlockingContext.hpp:389
Definition: Message.hpp:263
BlockingContext()
trivial ctor
Definition: BlockingContext.hpp:225
Definition: BlockingContext.hpp:670
Definition: MetaUtils.hpp:100
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Exception that just has an exit code.
Definition: Exception.hpp:28
virtual void messageDispatchingStartedCb(size_t const *pClientDispatchingStarted)
called before any messages got dispatched - only once
Definition: Client.hpp:179
Definition: BlockingBuffer.hpp:264
void send(Message &&m)
send a message to the BlockingContext to dispatch
Definition: BlockingContext.hpp:422
a std tuple holding messages types it can dispatch
Definition: BlockingContext.hpp:206
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
auto kickOffClientThread(RunOnceFunc runOnceFunc, Client &c, pattern::BlockingBuffer *buffer, uint64_t mask, time::Duration maxBlockingTime)
Definition: BlockingContext.hpp:718
Definition: BlockingContext.hpp:40