1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Message.hpp" 4 #include "hmbdc/app/RequestReply.hpp" 5 #include "hmbdc/app/Logger.hpp" 6 #include "hmbdc/app/Context.hpp" 7 #include "hmbdc/time/Timers.hpp" 8 #include "hmbdc/time/Time.hpp" 9 #include "hmbdc/pattern/BlockingBuffer.hpp" 10 #include "hmbdc/os/Thread.hpp" 11 #include "hmbdc/MetaUtils.hpp" 12 #include "hmbdc/Exception.hpp" 14 #include <type_traits> 21 namespace hmbdc {
namespace app {
22 namespace blocking_context_detail {
24 template <
bool is_timer_manager>
27 void operator()(C&) {}
33 tm.checkTimers(time::SysTime::now());
37 template <
typename... MessageTuples>
44 template <
typename MessageTuple,
typename... MessageTuples>
47 typename templatized_subtractor<REPLY, MessageTuple>::type
52 typename templatized_aggregator<REQUEST, MessageTuple>::type
57 typename templatized_aggregator<REPLY, MessageTuple>::type
64 typename std::enable_if<std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
66 return std::min(tm.untilNextFire(), maxBlockingTime);
70 typename std::enable_if<!std::is_base_of<time::TimerManager, T>::value,
time::Duration>::type
72 return maxBlockingTime;
75 template <
typename CcClient>
76 bool runOnceImpl(
bool& HMBDC_RESTRICT stopped
84 const bool clientParticipateInMessaging =
85 std::decay<CcClient>::type::INTERESTS_SIZE != 0;
86 if (clientParticipateInMessaging && buf) {
87 uint64_t count = buf->peek(begin, end);
89 c.CcClient::invokedCb(c.CcClient::handleRangeImpl(b, end, 0xffff));
90 buf->wasteAfterPeek(count);
92 buf->waitItem(waitDuration(c, maxBlockingTime));
95 c.CcClient::invokedCb(0);
96 auto d = waitDuration(c, maxBlockingTime);
97 if (d) usleep(d.microseconds());
99 }
catch (std::exception
const& e) {
119 template <
typename CcClient>
120 bool runOnceLoadSharingImpl(
bool& HMBDC_RESTRICT stopped
128 const bool clientParticipateInMessaging =
129 std::decay<CcClient>::type::INTERESTS_SIZE != 0;
130 if (clientParticipateInMessaging && buf) {
131 uint8_t msgWrapped[buf->maxItemSize()] __attribute__((aligned (8)));
132 void* tmp = msgWrapped;
134 if (buf->tryTake(tmp, 0, waitDuration(c, maxBlockingTime))) {
135 disp = MessageDispacher<CcClient, typename CcClient::Interests>()(
136 c, *static_cast<MessageHead*>(tmp));
138 c.CcClient::invokedCb(disp?1:0);
140 c.CcClient::invokedCb(0);
141 auto d = waitDuration(c, maxBlockingTime);
142 if (d) usleep(d.microseconds());
144 }
catch (std::exception
const& e) {
179 template <MessageTupleC... MessageTuples>
183 template <
typename Message>
struct can_handle;
191 using Interests =
typename cpa::Interests;
199 using Requests =
typename cpa::Requests;
207 using Replies =
typename cpa::Replies;
211 template <
typename T>
212 bool operator()(T&&) {
return true;}
218 template <
typename U = Requests>
220 std::tuple_size<U>::value == 0,
void>::type* =
nullptr)
223 template <
typename U = Requests>
225 std::tuple_size<U>::value,
void>::type* =
nullptr)
274 template <
typename Client,
typename DeliverPred = deliverAll>
276 ,
size_t capacity = 1024
278 , uint64_t cpuAffinity = 0
280 , DeliverPred&& pred = DeliverPred()) {
282 HMBDC_THROW(std::out_of_range,
"maxItemSize is too small");
283 std::shared_ptr<Transport> t(
new Transport(maxItemSize, capacity));
285 msgConduits_, t, std::forward<DeliverPred>(pred));
286 auto thrd = kickOffClientThread(blocking_context_detail::runOnceImpl<Client>
287 , c, t.use_count() == 1?
nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
288 threads_.push_back(move(thrd));
316 template <
typename LoadSharingClientPtrIt,
typename DeliverPred = deliverAll>
317 void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end
318 ,
size_t capacity = 1024
320 typename std::decay<decltype(**LoadSharingClientPtrIt())>::type::Interests
322 , uint64_t cpuAffinity = 0
324 , DeliverPred&& pred = DeliverPred()) {
325 using Client =
typename std::decay<
326 decltype(**LoadSharingClientPtrIt())
329 HMBDC_THROW(std::out_of_range,
"maxItemSize is too small");
330 if (begin == end)
return;
331 std::shared_ptr<Transport> t(
new Transport(maxItemSize, capacity));
333 msgConduits_, t, std::forward<DeliverPred>(pred));
334 while(begin != end) {
335 auto thrd = kickOffClientThread(blocking_context_detail::runOnceLoadSharingImpl<Client>
336 , **begin++, t.use_count() == 1?
nullptr:&t->buffer, cpuAffinity, maxBlockingTime);
337 threads_.push_back(std::move(thrd));
347 __atomic_thread_fence(__ATOMIC_ACQUIRE);
356 for (
auto& t : threads_) {
370 template <MessageC Message>
371 typename std::enable_if<can_handle<Message>::match,
void>::type
374 using M =
typename std::decay<Message>::type;
376 static_assert(i < std::tuple_size<MsgConduits>::value,
"unexpected message type");
377 auto& entries = std::get<i>(msgConduits_);
378 for (
auto i = 0u; i < entries.size(); ++i) {
379 auto& e = entries[i];
380 if (e.diliverPred(m)) {
381 if (i == entries.size() - 1) {
383 .template putInPlace<MessageWrap<M>>(std::forward<Message>(m));
385 e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
390 template <MessageC Message>
391 typename std::enable_if<!can_handle<Message>::match,
void>::type
393 justBytesSend(std::forward<Message>(m));
395 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 396 , std::decay<Message>::type::typeTag);)
409 template <MessageC Message, typename ... Args>
412 justBytesSendInPlace<Message>(args...);
413 using M =
typename std::decay<Message>::type;
415 auto& entries = std::get<i>(msgConduits_);
416 for (
auto i = 0u; i < entries.size(); ++i) {
417 auto& e = entries[i];
418 if (i == entries.size() - 1) {
420 .template putInPlace<MessageWrap<M>>(std::forward<Args>(args)...);
422 e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
426 template <MessageC Message,
typename ... Args>
427 typename std::enable_if<!can_handle<Message>::match,
void>::type
428 sendInPlace(Args&& ...args) {
429 justBytesSendInPlace<Message>(std::forward<Args>(args)...);
431 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 432 , std::decay<Message>::type::typeTag);)
448 template <MessageC Message>
451 if (!justBytesTrySend(m))
return false;
452 using M =
typename std::decay<Message>::type;
454 auto& entries = std::get<i>(msgConduits_);
455 for (
auto i = 0u; i < entries.size(); ++i) {
456 auto& e = entries[i];
457 if (e.diliverPred(m)) {
458 if (i == entries.size() - 1) {
459 if (!e.transport->buffer
473 template <MessageC Message>
474 typename std::enable_if<!can_handle<Message>::match,
bool>::type
475 trySend(Message&& m,
time::Duration = time::Duration::seconds(0)) {
477 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 478 , std::decay<Message>::type::typeTag);)
481 return justBytesTrySend(std::forward<Message>(m));
498 template <MessageC Message,
typename ... Args>
499 typename std::enable_if<can_handle<Message>::match,
bool>::type
501 if (!justBytesTrySendInPlace<Message>(args...))
return false;
502 using M =
typename std::decay<Message>::type;
504 auto& entries = std::get<i>(msgConduits_);
505 for (
auto i = 0u; i < entries.size(); ++i) {
506 auto& e = entries[i];
507 if (i == entries.size() - 1) {
508 if (!e.transport->buffer.template
513 if (!e.transport->buffer.template
522 template <MessageC Message,
typename ... Args>
523 typename std::enable_if<!can_handle<Message>::match,
bool>::type
524 trySendInPlace(Args&&... args) {
526 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 527 , std::decay<Message>::type::typeTag);)
530 return justBytesTrySendInPlace<Message>(args...);
542 template <MessageForwardIterC ForwardIt>
543 typename std::enable_if<can_handle<decltype(*(ForwardIt()))>::match,
void>::type
544 send(ForwardIt begin,
size_t n) {
546 justBytesSend(begin, n);
547 using Message = decltype(*begin);
548 using M =
typename std::decay<Message>::type;
550 auto& entries = std::get<i>(msgConduits_);
551 for (
auto i = 0u; i < entries.size(); ++i) {
552 auto& e = entries[i];
553 if (e.diliverPred(*begin)) {
554 while (!e.transport->buffer.template
560 template <MessageForwardIterC ForwardIt>
561 typename std::enable_if<!can_handle<decltype(*(ForwardIt()))>::match,
void>::type
562 send(ForwardIt begin,
size_t n) {
563 if (!
can_handle<decltype(*(ForwardIt()))>::justbytes) {
564 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"unhandled message sent typeTag=" 565 , std::decay<decltype(*(ForwardIt()))>::type::typeTag);)
567 justBytesSend(begin, n);
598 template <MessageC M,
typename ReplyHandler>
602 static_assert(std::tuple_size<Requests>::value != 0
603 ,
"no REQUEST is declared for this Context");
620 template <ReplyC ReplyT>
623 using R =
typename std::decay<ReplyT>::type;
624 static_assert(std::is_trivially_destructible<R>::value,
"unsupported");
625 static_assert(std::tuple_size<Requests>::value != 0
626 ,
"no REQUEST is declared for this Context, no one would be waiting");
627 rrCtx_.send(std::forward<ReplyT>(reply));
634 Transport(
size_t maxItemSize,
size_t capacity)
635 : buffer(maxItemSize +
sizeof(
MessageHead), capacity){}
639 template <
typename Message>
641 std::shared_ptr<Transport> transport;
642 std::function<bool (Message const&)> diliverPred;
645 template <
typename Message>
646 using TransportEntries = std::vector<TransportEntry<Message>>;
648 template <
typename Tuple>
struct MCGen;
649 template <
typename ...Messages>
651 using type = std::tuple<TransportEntries<Messages> ...>;
656 MsgConduits msgConduits_;
657 using Threads = std::vector<std::thread>;
660 typename std::conditional<std::tuple_size<Requests>::value
667 template <
typename MsgConduits,
typename DeliverPred,
typename Tuple>
669 void operator()(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&){}
673 template <
bool careMsg,
size_t i,
typename M,
typename MsgConduits,
typename DeliverPred>
674 typename std::enable_if<careMsg, void>::type
675 doOrNot(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred) {
676 auto& entries = std::get<i>(msgConduits);
680 template <
bool careMsg,
size_t i,
typename M,
typename MsgConduits,
typename DeliverPred>
681 typename std::enable_if<!careMsg, void>::type
682 doOrNot(MsgConduits&, std::shared_ptr<Transport>, DeliverPred&&) {
686 template <
typename MsgConduits,
typename DeliverPred,
typename M,
typename ...Messages>
688 void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t, DeliverPred&& pred){
690 createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
691 msgConduits, t, pred);
692 setupConduit<MsgConduits, DeliverPred, std::tuple<Messages...>>()
693 (msgConduits,t, std::forward<DeliverPred>(pred));
697 template <
typename MsgConduits,
typename M,
typename ...Messages>
699 void operator()(MsgConduits& msgConduits, std::shared_ptr<Transport> t,
void*){
701 createEntry().template doOrNot<i != std::tuple_size<Interests>::value, i, M>(
702 msgConduits, t, [](M
const&){
return true;});
703 setupConduit<MsgConduits,
void*, std::tuple<Messages...>>()
704 (msgConduits, t,
nullptr);
708 template <
typename Message>
711 using M =
typename std::decay<Message>::type;
715 match = index < std::tuple_size<MsgConduits>::value,
717 justbytes = justbytes_index < std::tuple_size<MsgConduits>::value
718 && std::is_trivially_destructible<M>::value? 1 :0,
722 template <
typename Client,
typename RunOnceFunc>
725 if (buffer ==
nullptr) {
726 HMBDC_LOG_ONCE(HMBDC_LOG_D(
"starting a Client that cannot receive messages, Client typid()=" 727 ,
typeid(
Client).name());)
737 char const* schedule;
746 auto cpuAffinityMask = mask;
747 std::tie(schedule, priority) = c.
schedSpec();
749 if (!schedule) schedule =
"SCHED_OTHER";
751 os::configureCurrentThread(name.c_str(), cpuAffinityMask
752 , schedule, priority);
754 }
catch (std::exception
const& e) {
765 while(!stopped_ && runOnceFunc(this->stopped_, buffer, c, maxBlockingTime)) {
767 if (this->stopped_) c.dropped();
775 template <MessageC Message>
776 typename std::enable_if<can_handle<Message>::justbytes,
void>::type
777 justBytesSend(Message&& m) {
778 using M =
typename std::decay<Message>::type;
780 auto& entries = std::get<i>(msgConduits_);
781 for (
auto i = 0u; i < entries.size(); ++i) {
782 auto& e = entries[i];
783 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
784 e.transport->buffer.template putInPlace<MessageWrap<M>>(m);
786 HMBDC_THROW(std::out_of_range
787 ,
"need to increase maxItemSize in start() to >= " 793 template <MessageC Message>
794 typename std::enable_if<!can_handle<Message>::justbytes,
void>::type
795 justBytesSend(Message&& m) {
798 template <MessageC Message,
typename ... Args>
799 typename std::enable_if<can_handle<Message>::justbytes,
void>::type
800 justBytesSendInPlace(Args&& ...args) {
801 using M =
typename std::decay<Message>::type;
803 auto& entries = std::get<i>(msgConduits_);
804 for (
auto i = 0u; i < entries.size(); ++i) {
805 auto& e = entries[i];
806 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
807 e.transport->buffer.template putInPlace<MessageWrap<M>>(args...);
809 HMBDC_THROW(std::out_of_range
810 ,
"need to increase maxItemSize in start() to >= " 816 template <MessageC Message,
typename ... Args>
817 typename std::enable_if<!can_handle<Message>::justbytes,
void>::type
818 justBytesSendInPlace(Args&&... args) {
821 template <MessageC Message>
822 typename std::enable_if<can_handle<Message>::justbytes,
bool>::type
824 using M =
typename std::decay<Message>::type;
826 auto& entries = std::get<i>(msgConduits_);
827 for (
auto i = 0u; i < entries.size(); ++i) {
828 auto& e = entries[i];
829 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
834 HMBDC_THROW(std::out_of_range
835 ,
"need to increase maxItemSize in start() to >= " 841 template <MessageC Message>
842 typename std::enable_if<!can_handle<Message>::justbytes,
bool>::type
847 template <MessageC Message,
typename ... Args>
848 typename std::enable_if<can_handle<Message>::justbytes,
bool>::type
849 justBytesTrySendInPlace(Args&&... args) {
850 using M =
typename std::decay<Message>::type;
852 auto& entries = std::get<i>(msgConduits_);
853 for (
auto i = 0u; i < entries.size(); ++i) {
854 auto& e = entries[i];
855 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
856 if (!e.transport->buffer.template
861 HMBDC_THROW(std::out_of_range
862 ,
"need to increase maxItemSize in start() to >= " 868 template <MessageC Message,
typename ... Args>
869 typename std::enable_if<!can_handle<Message>::justbytes,
bool>::type
870 justBytesTrySendInPlace(Args&&... args) {
874 template <MessageForwardIterC ForwardIt>
875 typename std::enable_if<can_handle<decltype(*(ForwardIt()))>::justbytes,
void>::type
876 justBytesSend(ForwardIt begin,
size_t n) {
877 using Message = decltype(*begin);
878 using M =
typename std::decay<Message>::type;
880 auto& entries = std::get<i>(msgConduits_);
881 for (
auto i = 0u; i < entries.size(); ++i) {
882 auto& e = entries[i];
884 if (e.transport->buffer.maxItemSize() >=
sizeof(
MessageWrap<M>)) {
885 while (!e.transport->buffer.template tryPutBatchInPlace<
MessageWrap<M>>(begin, n)) {}
887 HMBDC_THROW(std::out_of_range
888 ,
"need to increase maxItemSize in start() to >= " 894 template <MessageForwardIterC ForwardIt>
895 typename std::enable_if<!can_handle<decltype(*(ForwardIt()))>::justbytes,
void>::type
896 justBytesSend(ForwardIt begin,
size_t n) {
Definition: BlockingBuffer.hpp:16
Definition: MetaUtils.hpp:80
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:75
bool requestReply(RequestT &&request, Sender &sender, ReplyContext &ctx, ReplyHandler &replyHandler, time::Duration timeout)
not an interface exposed to end user
Definition: RequestReply.hpp:142
std::enable_if< can_handle< decltype(*(ForwardIt()))>::match, void >::type send(ForwardIt begin, size_t n)
send a range of messages via the BlockingContext
Definition: BlockingContext.hpp:544
Definition: BlockingContext.hpp:633
Definition: BlockingContext.hpp:38
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:85
Definition: Timers.hpp:66
Definition: BlockingContext.hpp:648
Definition: MetaUtils.hpp:44
std::enable_if< can_handle< Message >::match, void >::type send(Message &&m)
send a message to the BlockingContext to dispatch
Definition: BlockingContext.hpp:372
Definition: BlockingContext.hpp:640
Unknown excpetion.
Definition: Exception.hpp:17
Definition: BlockingContext.hpp:183
std::enable_if< can_handle< Message >::match, void >::type sendInPlace(Args &&... args)
send a message by directly constructing the message in receiving message queue
Definition: BlockingContext.hpp:411
Definition: BlockingContext.hpp:210
a std tuple holding REPLY messages types it can dispatch
BlockingContext(typename std::enable_if< std::tuple_size< U >::value==0, void >::type *=nullptr)
trivial ctor
Definition: BlockingContext.hpp:219
void stopped(std::exception const &e) noexcept
the following are for internal use, don't change or override
Definition: Client.hpp:182
A BlockingContext is like a media object that facilitates the communications for the Clients that it ...
Definition: BlockingContext.hpp:180
Definition: Message.hpp:78
Definition: BlockingContext.hpp:672
bool requestReply(REQUEST< M > &&request, ReplyHandler &replyHandler, time::Duration timeout=time::Duration::seconds(0xffffffff))
part of synchronous request reply interface. The caller is blocked until the replies to this request ...
Definition: BlockingContext.hpp:600
void start(Client &c, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename Client::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a client within the Context. The client is powered by a single OS thread.
Definition: BlockingContext.hpp:275
Context template parameter inidcating each message is sent to all clients within the Context...
Definition: Context.hpp:57
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:477
void start(LoadSharingClientPtrIt begin, LoadSharingClientPtrIt end, size_t capacity=1024, size_t maxItemSize=max_size_in_tuple< typename std::decay< decltype(**LoadSharingClientPtrIt())>::type::Interests >::value, uint64_t cpuAffinity=0, time::Duration maxBlockingTime=time::Duration::seconds(1), DeliverPred &&pred=DeliverPred())
start a group of clients within the Context that collectively processing messages in a load sharing m...
Definition: BlockingContext.hpp:317
void join()
wait until all threads of the Context exit
Definition: BlockingContext.hpp:355
void stop()
stop the message dispatching - asynchronously
Definition: BlockingContext.hpp:346
Definition: Message.hpp:112
Definition: BlockingContext.hpp:668
virtual void messageDispatchingStartedCb(uint16_t threadSerialNumber)
called before any messages got dispatched - only once
Definition: Client.hpp:113
Definition: MetaUtils.hpp:96
std::enable_if< can_handle< Message >::match, bool >::type trySend(Message &&m, time::Duration timeout=time::Duration::seconds(0))
best effort to send a message to via the BlockingContext
Definition: BlockingContext.hpp:450
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Exception that just has an exit code.
Definition: Exception.hpp:28
Definition: BlockingBuffer.hpp:253
a std tuple holding messages types it can dispatch except REPLYs
Definition: BlockingContext.hpp:183
std::enable_if< can_handle< Message >::match, bool >::type trySendInPlace(Args &&... args)
best effort to send a message by directly constructing the message in receiving message queue ...
Definition: BlockingContext.hpp:500
Definition: BlockingContext.hpp:25
a std tuple holding REQUEST messages types it can dispatch
template that convert a regular message to be a request used in request / reply sync messaging - see ...
Definition: Message.hpp:339
void sendReply(ReplyT &&reply)
part of synchronous request reply interface. Send a constructed REPLY to the Context, it will only reach to the origianl sender Client. ALL REPLY need to sent out using this interface to avoid undefined behavior.
Definition: BlockingContext.hpp:622