1 #include "hmbdc/Copyright.hpp"
3 #include "hmbdc/app/udpcast/Transport.hpp"
4 #include "hmbdc/app/udpcast/Messages.hpp"
5 #include "hmbdc/app/Base.hpp"
6 #include "hmbdc/comm/inet/Endpoint.hpp"
7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/time/Rater.hpp"
10 #include "hmbdc/numeric/BitMath.hpp"
17 namespace hmbdc {
namespace app {
namespace udpcast {
19 namespace sendtransportengine_detail {
21 using namespace hmbdc::time;
22 using namespace hmbdc::pattern;
27 using ptr = std::shared_ptr<SendTransport>;
31 bool match(
Topic const& t)
const {
32 return std::regex_match(t.c_str(), topicRegex_);
35 template <
typename... Messages>
36 void queue(
Topic const& t, Messages&&... msgs) {
37 auto n =
sizeof...(msgs);
38 auto it = buffer_.claim(n);
39 queue(it, t, std::forward<Messages>(msgs)...);
40 buffer_.commit(it, n);
43 template <
typename... Messages>
44 bool tryQueue(
Topic const& t, Messages&&... msgs) {
45 auto n =
sizeof...(msgs);
46 auto it = buffer_.tryClaim(n);
48 queue(it, t, std::forward<Messages>(msgs)...);
49 buffer_.commit(it, n);
55 template <
typename Message,
typename ... Args>
56 void queueInPlace(
Topic const& t, Args&&... args) {
57 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
58 auto s = buffer_.claim();
59 char* addr =
static_cast<char*
>(*s);
63 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
67 HMBDC_THROW(std::out_of_range
68 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
73 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len);
75 void runOnce(
bool alwaysPoll =
false) HMBDC_RESTRICT {
77 if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
78 utils::EpollTask::instance().poll();
84 using Message =
typename std::decay<M>::type;
85 char* addr =
static_cast<char*
>(buf);
89 if (hmbdc_likely(
sizeof(Message) <= bufLen)) {
93 HMBDC_THROW(std::out_of_range,
"bufLen too small to hold a message");
102 std::regex topicRegex_;
103 size_t maxMessageSize_;
107 size_t maxSendBatch_;
110 size_t toSendMsgsHead_;
111 size_t toSendMsgsTail_;
112 mmsghdr* toSendPkts_;
113 size_t toSendPktsHead_;
114 size_t toSendPktsTail_;
115 std::vector<comm::inet::Endpoint> udpcastDests_;
118 outBufferSizePower2();
120 template<
typename M,
typename ... Messages>
122 , M&& m, Messages&&... msgs) {
123 using Message =
typename std::decay<M>::type;
124 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
126 char* addr =
static_cast<char*
>(s);
130 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
134 HMBDC_THROW(std::out_of_range
135 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
137 queue(++it, t, std::forward<Messages>(msgs)...);
147 ,
Client<SendTransportEngine> {
148 using SendTransport::SendTransport;
149 using SendTransport::hmbdcName;
150 using SendTransport::schedSpec;
160 if (runLock_.try_lock()) {
181 this->runLock_.unlock();
189 using SendTransport = sendtransportengine_detail::SendTransport;
190 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
Definition: MonoLockFreeBuffer.hpp:15
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: SendTransportEngine.hpp:145
void invokedCb(uint16_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:172
Definition: SendTransportEngine.hpp:25
Definition: Message.hpp:112
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: SendTransportEngine.hpp:167
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:175
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:159
Definition: LockFreeBufferMisc.hpp:89