1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/SendServer.hpp" 6 #include "hmbdc/app/tcpcast/Messages.hpp" 7 #include "hmbdc/app/udpcast/SendTransportEngine.hpp" 8 #include "hmbdc//MetaUtils.hpp" 9 #include "hmbdc/time/Time.hpp" 10 #include "hmbdc/time/Rater.hpp" 11 #include "hmbdc/numeric/BitMath.hpp" 13 #include <boost/circular_buffer.hpp> 17 #include <type_traits> 20 namespace hmbdc {
namespace app {
namespace tcpcast {
22 namespace send_detail {
25 using pattern::MonoLockFreeBuffer;
32 using ptr = std::shared_ptr<SendTransport>;
39 template <
typename... Messages>
40 void queue(
Topic const& t, Messages&&... msgs) {
41 auto n =
sizeof...(msgs);
42 auto it = buffer_.claim(n);
43 queue(it, t, std::forward<Messages>(msgs)...);
44 buffer_.commit(it, n);
47 template <
typename... Messages>
48 bool tryQueue(
Topic const& t, Messages&&... msgs) {
49 auto n =
sizeof...(msgs);
50 auto it = buffer_.tryClaim(n);
52 queue(it, t, std::forward<Messages>(msgs)...);
53 buffer_.commit(it, n);
59 template <
typename Message,
typename ... Args>
60 void queueInPlace(
Topic const& t, Args&&... args) {
61 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
62 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
64 ,
"hasMemoryAttachment has to the first base for Message");
65 auto s = buffer_.claim();
66 char* addr =
static_cast<char*
>(*s);
68 h->flag = calculateFlag<Message>();
72 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
75 HMBDC_THROW(std::out_of_range
76 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
81 void queueBytes(
Topic const& t, uint16_t tag,
void const* bytes,
size_t len);
85 bool match(
Topic const& t)
const {
86 return std::regex_match(t.c_str(), topicRegex_);
91 std::regex topicRegex_;
94 size_t maxMessageSize_;
95 size_t minRecvToStart_;
98 unique_ptr<udpcast::SendTransport> mcSendTransport_;
104 template <
typename Message>
106 uint8_t calculateFlag() {
107 if (is_base_of<hasMemoryAttachment, Message>::value)
return hasMemoryAttachment::flag;
111 template<
typename M,
typename ... Messages>
113 , M&& m, Messages&&... msgs) {
114 using Message =
typename std::remove_reference<M>::type;
115 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
116 static_assert(!is_base_of<hasMemoryAttachment, Message>::value
118 ,
"hasMemoryAttachment has to the first base for Message");
121 char* addr =
static_cast<char*
>(s);
123 h->flag = calculateFlag<Message>();
127 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
130 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
132 queue(++it, t, std::forward<Messages>(msgs)...);
143 ,
Client<SendTransportEngine> {
145 using SendTransport::hmbdcName;
146 using SendTransport::schedSpec;
156 if (runLock_.try_lock()) {
157 this->checkTimers(time::SysTime::now());
171 mcSendTransport_->runOnce(
true);
187 if (stopped_)
return 0;
188 if (server_ && !minRecvToStart_) {
189 return server_->readySessionCount();
191 return numeric_limits<size_t>::max();
195 this->runLock_.unlock();
200 auto seq = buffer_.readSeq();
201 if (seq == lastSeq_ && buffer_.isFull()) {
202 server_->killSlowestSession();
210 size_t maxSendBatch_;
211 bool waitForSlowReceivers_;
216 unique_ptr<SendServer> server_;
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:169
Definition: MonoLockFreeBuffer.hpp:15
capture the transportation mechanism
Definition: SendTransportEngine.hpp:30
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:155
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:175
Definition: Timers.hpp:66
Definition: Transport.hpp:69
Definition: MetaUtils.hpp:36
Definition: SendTransportEngine.hpp:139
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:186
Definition: Message.hpp:112
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: SendTransportEngine.hpp:164
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Definition: Timers.hpp:113
Definition: LockFreeBufferMisc.hpp:89