1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Message.hpp" 4 #include "hmbdc/app/RequestReply.hpp" 5 #include "hmbdc/app/Logger.hpp" 6 #include "hmbdc/app/Context.hpp" 7 #include "hmbdc/time/Timers.hpp" 8 #include "hmbdc/time/Time.hpp" 9 #include "hmbdc/pattern/BlockingBuffer.hpp" 10 #include "hmbdc/os/Thread.hpp" 11 #include "hmbdc/MetaUtils.hpp" 12 #include "hmbdc/Exception.hpp" 14 #include <type_traits> 21 namespace hmbdc {
namespace app {
22 namespace blocking_context_detail {
24 template <
bool is_timer_manager>
27 void operator()(C&) {}
33 tm.checkTimers(time::SysTime::now());
37 template <
typename... MessageTuples>
44 template <
typename MessageTuple,
typename... MessageTuples>
47 typename templatized_subtractor<REPLY, MessageTuple>::type
52 typename templatized_aggregator<REQUEST, MessageTuple>::type
57 typename templatized_aggregator<REPLY, MessageTuple>::type
64 typename std::enable_if<std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
66 return std::min(tm.untilNextFire(), maxBlockingTime);
70 typename std::enable_if<!std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
72 return maxBlockingTime;
75 template <
typename CcClient>
76 bool runOnceImpl(
bool& HMBDC_RESTRICT stopped
84 const bool clientParticipateInMessaging =
85 std::decay<CcClient>::type::INTERESTS_SIZE != 0;
86 if (clientParticipateInMessaging && buf) {
87 uint64_t count = buf->peek(begin, end);
89 c.CcClient::handleRangeImpl(b, end, 0xffff);
90 c.CcClient::invokedCb(0xffff);
91 buf->wasteAfterPeek(count);
93 buf->waitItem(waitDuration(c, maxBlockingTime));
96 c.CcClient::invokedCb(0xffffu);
97 auto d = waitDuration(c, maxBlockingTime);
98 if (d) usleep(d.microseconds());
100 }
catch (std::exception
const& e) {
120 template <
typename CcClient>
121 bool runOnceLoadSharingImpl(
bool& HMBDC_RESTRICT stopped
129 const bool clientParticipateInMessaging =
130 std::decay<CcClient>::type::INTERESTS_SIZE != 0;
131 if (clientParticipateInMessaging && buf) {
132 uint8_t msgWrapped[buf->maxItemSize()] __attribute__((aligned (8)));
133 void* tmp = msgWrapped;
134 if (buf->tryTake(tmp, 0, waitDuration(c, maxBlockingTime))) {
135 MessageDispacher<CcClient, typename CcClient::Interests>()(
136 c, *static_cast<MessageHead*>(tmp));
138 c.CcClient::invokedCb(0xffffu);
140 c.CcClient::invokedCb(0xffffu);
141 auto d = waitDuration(c, maxBlockingTime);
142 if (d) usleep(d.microseconds());
144 }
catch (std::exception
const& e) {
179 template <MessageTupleC... MessageTuples>
183 template <
typename Message>
struct can_handle;
191 using Interests =
typename cpa::Interests;
199 using Requests =
typename cpa::Requests;
207 using Replies =
typename cpa::Replies;
211 template <
typename T>
212 bool operator()(T&&) {
return true;}
218 template <
typename U = Requests>
220 std::tuple_size<U>::value == 0,
void>::type* =
nullptr)
223 template <
typename U = Requests>
225 std::tuple_size<U>::value,
void>::type* =
nullptr)
274 template <
typename Client,
typename DeliverPred = deliverAll>
276 ,
size_t capacity = 1024
278 , uint64_t cpuAffinity = 0
280 , DeliverPred&& pred = DeliverPred()) {
282 HMBDC_THROW(std::out_of_range,
"maxItemSize is too small");
283 std::shared_ptr<Transport> t(
new Transport(maxItemSize, capacity));
285 msgConduits_, t, std::forward<DeliverPred>(pred));
286 auto thrd = kickOffClientThread(blocking_context_detail::runOnceImpl<Client>
287 , c, t.use_count() == 1?
nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
288 threads_.push_back(move(thrd));
316 template <
typename LoadSharingClientPtrIt,
typename DeliverPred = deliverAll>
317 void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end
318 ,
size_t capacity = 1024
320 typename std::decay<decltype(**LoadSharingClientPtrIt())>::type::Interests
322 , uint64_t cpuAffinity = 0
324 , DeliverPred&& pred = DeliverPred()) {
325 using Client =
typename std::decay<
326 decltype(**LoadSharingClientPtrIt())
329 HMBDC_THROW(std::out_of_range,
"maxItemSize is too small");
330 if (begin == end)
return;
331 std::shared_ptr<Transport> t(
new Transport(maxItemSize, capacity));
333 msgConduits_, t, std::forward<DeliverPred>(pred));
334 while(begin != end) {
335 auto thrd = kickOffClientThread(blocking_context_detail::runOnceLoadSharingImpl<Client>
336 , **begin++, t.use_count() == 1?
nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
337 threads_.push_back(std::move(thrd));
347 __atomic_thread_fence(__ATOMIC_ACQUIRE);
356 for (
auto& t : threads_) {
370 template <MessageC Message>
371 typename std::enable_if<can_handle<Message>::match,
void>::type
374 using M =
typename std::decay<Message>::type;
376 static_assert(i < std::tuple_size<MsgConduits>::value,
"unexpected message type");
377 auto& entries = std::get<i>(msgConduits_);
378 for (
auto i = 0u; i < entries.size(); ++i) {
379 auto& e = entries[i];
380 if (e.diliverPred(m)) {
381 if (i == entries.size() - 1) {
383 .template putInPlace<MessageWrap<M>>(std::forward<Message>(m));
385 e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
390 template <MessageC Message>
391 typename std::enable_if<!can_handle<Message>::match,
void>::type
393 justBytesSend(std::forward<Message>(m));
395 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 396 , std::decay<Message>::type::typeTag);)
409 template <MessageC Message, typename ... Args>
412 justBytesSendInPlace<Message>(args...);
413 using M =
typename std::decay<Message>::type;
415 auto& entries = std::get<i>(msgConduits_);
416 for (
auto i = 0u; i < entries.size(); ++i) {
417 auto& e = entries[i];
418 if (i == entries.size() - 1) {
420 .template putInPlace<MessageWrap<M>>(std::forward<Args>(args)...);
422 e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
426 template <MessageC Message,
typename ... Args>
427 typename std::enable_if<!can_handle<Message>::match,
void>::type
428 sendInPlace(Args&& ...args) {
429 justBytesSendInPlace<Message>(std::forward<Args>(args)...);
431 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 432 , std::decay<Message>::type::typeTag);)
448 template <MessageC Message>
451 if (!justBytesTrySend(m))
return false;
452 using M =
typename std::decay<Message>::type;
454 auto& entries = std::get<i>(msgConduits_);
455 for (
auto i = 0u; i < entries.size(); ++i) {
456 auto& e = entries[i];
457 if (e.diliverPred(m)) {
458 if (i == entries.size() - 1) {
459 if (!e.transport->buffer
473 template <MessageC Message>
474 typename std::enable_if<!can_handle<Message>::match,
bool>::type
475 trySend(Message&& m,
time::Duration = time::Duration::seconds(0)) {
477 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 478 , std::decay<Message>::type::typeTag);)
481 return justBytesTrySend(std::forward<Message>(m));
498 template <MessageC Message,
typename ... Args>
499 typename std::enable_if<can_handle<Message>::match,
bool>::type
501 if (!justBytesTrySendInPlace<Message>(args...))
return false;
502 using M =
typename std::decay<Message>::type;
504 auto& entries = std::get<i>(msgConduits_);
505 for (
auto i = 0u; i < entries.size(); ++i) {
506 auto& e = entries[i];
507 if (i == entries.size() - 1) {
508 if (!e.transport->buffer.template
513 if (!e.transport->buffer.template
522 template <MessageC Message,
typename ... Args>
523 typename std::enable_if<!can_handle<Message>::match,
bool>::type
524 trySendInPlace(Args&&... args) {
526 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 527 , std::decay<Message>::type::typeTag);)
530 return justBytesTrySendInPlace<Message>(args...);
542 template <MessageForwardIterC ForwardIt>
543 typename std::enable_if<can_handle<decltype(*(ForwardIt()))>::match,
void>::type
544 send(ForwardIt begin,
size_t n) {
546 justBytesSend(begin, n);
547 using Message = decltype(*begin);
548 using M =
typename std::decay<Message>::type;
550 auto& entries = std::get<i>(msgConduits_);
551 for (
auto i = 0u; i < entries.size(); ++i) {
552 auto& e = entries[i];
553 if (e.diliverPred(*begin)) {
554 while (!e.transport->buffer.template
560 template <MessageForwardIterC ForwardIt>
561 typename std::enable_if<!can_handle<decltype(*(ForwardIt()))>::match,
void>::type
562 send(ForwardIt begin,
size_t n) {
563 if (!
can_handle<decltype(*(ForwardIt()))>::justbytes) {
564 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 565 , std::decay<decltype(*(ForwardIt()))>::type::typeTag);)
567 justBytesSend(begin, n);
598 template <MessageC M,
typename ReplyHandler>
602 static_assert(std::tuple_size<Requests>::value != 0
603 ,
"no REQUEST is declared for this Context");
620 template <ReplyC ReplyT>
623 using R =
typename std::decay<ReplyT>::type;
624 static_assert(std::is_trivially_destructible<R>::value,
"unsupported");
625 static_assert(std::tuple_size<Requests>::value != 0
626 ,
"no REQUEST is declared for this Context, no one would be waiting");
627 rrCtx_.send(std::forward<ReplyT>(reply));
634 Transport(
size_t maxItemSize,
size_t capacity)
635 : buffer(maxItemSize +
sizeof(
MessageHead), capacity){}
639 template <
typename Message>
641 std::shared_ptr<Transport> transport;
642 std::function<bool (Message const&)> diliverPred;
645 template <
typename Message>
646 using TransportEntries = std::vector<TransportEntry<Message>>;
648 template <
typename Tuple>
struct MCGen;
649 template <
typename ...Messages>
651 using type = std::tuple<TransportEntries<Messages> ...>;
656 MsgConduits msgConduits_;
657 using Threads = std::vector<std::thread>;
660 typename std::conditional<std::tuple_size<Requests>::value
667 template <
typename MsgConduits,
typename DeliverPred,
typename Tuple>
669 void operator()(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&){}
673 template <
bool careMsg,
size_t i,
typename M,
typename MsgConduits,
typename DeliverPred>
674 typename std::enable_if<careMsg, void>::type
675 doOrNot(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred) {
676 auto& entries = std::get<i>(msgConduits);
680 template <
bool careMsg,
size_t i,
typename M,
typename MsgConduits,
typename DeliverPred>
681 typename std::enable_if<!careMsg, void>::type
682 doOrNot(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&) {
686 template <
typename MsgConduits,
typename DeliverPred,
typename M,
typename ...Messages>
688 void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred){
690 createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
691 msgConduits, t, pred);
692 setupConduit<MsgConduits, DeliverPred, std::tuple<Messages...>>()
693 (msgConduits,t, std::forward<DeliverPred>(pred));
697 template <
typename MsgConduits,
typename M,
typename ...Messages>
699 void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t,
void*){
701 createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
702 msgConduits, t, [](M
const&){
return true;});
703 setupConduit<MsgConduits,
void*, std::tuple<Messages...>>()
704 (msgConduits, t,
nullptr);
708 template <
typename Message>
711 using M =
typename std::decay<Message>::type;
715 match = index < std::tuple_size<MsgConduits>::value,
717 justbytes = justbytes_index < std::tuple_size<MsgConduits>::value
718 && std::is_trivially_destructible<M>::value? 1 :0,
722 template <
typename Client,
typename RunOnceFunc>
725 if (buffer ==
nullptr) {
726 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"starting a Client that cannot receive messages, Client typid()=" 727 ,
typeid(
Client).name());)
737 char const* schedule;
745 auto cpuAffinityMask = mask;
746 std::tie(schedule, priority) = c.
schedSpec();
748 if (!schedule) schedule =
"SCHED_OTHER";
750 os::configureCurrentThread(name.c_str(), cpuAffinityMask
751 , schedule, priority);
754 while(!stopped_ && runOnceFunc(this->stopped_, buffer, c, maxBlockingTime)) {
756 if (this->stopped_) c.dropped();
764 template <MessageC Message>
765 typename std::enable_if<can_handle<Message>::justbytes,
void>::type
766 justBytesSend(Message&& m) {
767 using M =
typename std::decay<Message>::type;
769 auto& entries = std::get<i>(msgConduits_);
770 for (
auto i = 0u; i < entries.size(); ++i) {
771 auto& e = entries[i];
772 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
773 e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
775 HMBDC_THROW(std::out_of_range
776 ,
"need to increase maxItemSize in start() to >= " 782 template <MessageC Message>
783 typename std::enable_if<!can_handle<Message>::justbytes,
void>::type
784 justBytesSend(Message&& m) {
787 template <MessageC Message,
typename ... Args>
788 typename std::enable_if<can_handle<Message>::justbytes,
void>::type
789 justBytesSendInPlace(Args&& ...args) {
790 using M =
typename std::decay<Message>::type;
792 auto& entries = std::get<i>(msgConduits_);
793 for (
auto i = 0u; i < entries.size(); ++i) {
794 auto& e = entries[i];
795 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
796 e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
798 HMBDC_THROW(std::out_of_range
799 ,
"need to increase maxItemSize in start() to >= " 805 template <MessageC Message,
typename ... Args>
806 typename std::enable_if<!can_handle<Message>::justbytes,
void>::type
807 justBytesSendInPlace(Args&&... args) {
810 template <MessageC Message>
811 typename std::enable_if<can_handle<Message>::justbytes,
bool>::type
813 using M =
typename std::decay<Message>::type;
815 auto& entries = std::get<i>(msgConduits_);
816 for (
auto i = 0u; i < entries.size(); ++i) {
817 auto& e = entries[i];
818 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
823 HMBDC_THROW(std::out_of_range
824 ,
"need to increase maxItemSize in start() to >= " 830 template <MessageC Message>
831 typename std::enable_if<!can_handle<Message>::justbytes,
bool>::type
836 template <MessageC Message,
typename ... Args>
837 typename std::enable_if<can_handle<Message>::justbytes,
bool>::type
838 justBytesTrySendInPlace(Args&&... args) {
839 using M =
typename std::decay<Message>::type;
841 auto& entries = std::get<i>(msgConduits_);
842 for (
auto i = 0u; i < entries.size(); ++i) {
843 auto& e = entries[i];
844 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
845 if (!e.transport->buffer.template
850 HMBDC_THROW(std::out_of_range
851 ,
"need to increase maxItemSize in start() to >= " 857 template <MessageC Message,
typename ... Args>
858 typename std::enable_if<!can_handle<Message>::justbytes,
bool>::type
859 justBytesTrySendInPlace(Args&&... args) {
863 template <MessageForwardIterC ForwardIt>
864 typename std::enable_if<can_handle<decltype(*(ForwardIt()))>::justbytes,
void>::type
865 justBytesSend(ForwardIt begin,
size_t n) {
866 using Message = decltype(*begin);
867 using M =
typename std::decay<Message>::type;
869 auto& entries = std::get<i>(msgConduits_);
870 for (
auto i = 0u; i < entries.size(); ++i) {
871 auto& e = entries[i];
873 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
874 while (!e.transport->buffer.template tryPutBatchInPlace<
MessageWrap<M>>(begin, n)) {}
876 HMBDC_THROW(std::out_of_range
877 ,
"need to increase maxItemSize in start() to >= " 883 template <MessageForwardIterC ForwardIt>
884 typename std::enable_if<!can_handle<decltype(*(ForwardIt()))>::justbytes,
void>::type
885 justBytesSend(ForwardIt begin,
size_t n) {
Definition: BlockingBuffer.hpp:16
Definition: MetaUtils.hpp:80
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:75
bool requestReply(RequestT &&request, Sender &sender, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout)
not an interface exposed to end user
Definition: RequestReply.hpp:140
std::enable_if< can_handle< decltype(*(ForwardIt()))>::match, void >::type send(ForwardIt begin, size_t n)
send a range of messages via the BlockingContext
Definition: BlockingContext.hpp:544
Definition: BlockingContext.hpp:633
Definition: TypedString.hpp:76
Definition: BlockingContext.hpp:38
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:85
Definition: Timers.hpp:66
Definition: BlockingContext.hpp:648
Definition: MetaUtils.hpp:44
std::enable_if< can_handle< Message >::match, void >::type send(Message &&m)
send a message to the BlockingContext to dispatch
Definition: BlockingContext.hpp:372
Definition: BlockingContext.hpp:640
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingContext.hpp:183
std::enable_if< can_handle< Message >::match, void >::type sendInPlace(Args &&... args)
send a message by directly constructing the message in receiving message queue
Definition: BlockingContext.hpp:411
Definition: BlockingContext.hpp:210
a std tuple holding REPLY messages types it can dispatch
BlockingContext(typename std::enable_if< std::tuple_size< U >::value==0, void >::type *=nullptr)
trivial ctor
Definition: BlockingContext.hpp:219
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:180
Definition: Message.hpp:78
Definition: BlockingContext.hpp:672
bool requestReply(REQUEST< M > &&request, ReplyHandler &replyHandler, time::Duration timeout=time::Duration::seconds(0xffffffff))
part of synchronous request reply interface. The caller is blocked until the replies to this request ...
Definition: BlockingContext.hpp:600
void start(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a client within the Context. The client is powered by a single OS thread.
Definition: BlockingContext.hpp:275
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:57
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:477
void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename std::decay< decltype(**LoadSharingClientPtrIt())>::type::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a group of clients within the Context that collectively processing messages in a load sharing m...
Definition: BlockingContext.hpp:317
void join()
wait until all threads of the Context exit
Definition: BlockingContext.hpp:355
void stop()
stop the message dispatching - asynchronously
Definition: BlockingContext.hpp:346
Definition: Message.hpp:112
Definition: BlockingContext.hpp:668
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:109
Definition: MetaUtils.hpp:96
std::enable_if< can_handle< Message >::match, bool >::type trySend(Message &&m, time::Duration timeout=time::Duration::seconds(0))
best effort to send a message to via the BlockingContext
Definition: BlockingContext.hpp:450
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Exception that just has an exit code.
Definition: Exception.hpp:28
Definition: BlockingBuffer.hpp:253
a std tuple holding messages types it can dispatch except REPLYs
Definition: BlockingContext.hpp:183
std::enable_if< can_handle< Message >::match, bool >::type trySendInPlace(Args &&... args)
best effort to send a message by directly constructing the message in receiving message queue ...
Definition: BlockingContext.hpp:500
Definition: BlockingContext.hpp:25
a std tuple holding REQUEST messages types it can dispatch
template that convert a regular message to be a request used in request / reply sync messaging - see ...
Definition: Message.hpp:339
void sendReply(ReplyT &&reply)
part of synchronous request reply interface. Send a constructed REPLY to the Context, it will only reach to the origianl sender Client. ALL REPLY need to sent out using this interface to avoid undefined behavior.
Definition: BlockingContext.hpp:622