1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Client.hpp" 4 #include "hmbdc/app/Logger.hpp" 5 #include "hmbdc/tips/rmcast/Transport.hpp" 6 #include "hmbdc/tips/rmcast/BackupSendServer.hpp" 7 #include "hmbdc/tips/rmcast/Messages.hpp" 8 #include "hmbdc/tips/rmcast/McSendTransport.hpp" 9 #include "hmbdc/tips/rmcast/DefaultUserConfig.hpp" 11 #include "hmbdc/time/Time.hpp" 12 #include "hmbdc/time/Rater.hpp" 13 #include "hmbdc/numeric/BitMath.hpp" 14 #include "hmbdc/MetaUtils.hpp" 20 #include <type_traits> 22 namespace hmbdc {
namespace tips {
namespace rmcast {
24 namespace sendtransportengine_detail {
25 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
46 size_t bufferedMessageCount()
const {
47 return buffer_.remainingSize();
50 size_t subscribingPartyDetectedCount(uint16_t tag)
const {
51 return outboundSubscriptions_.check(tag);
54 template <
typename MemoryAttachementMessage>
56 ,
typename std::decay<MemoryAttachementMessage>::type>::value,
void>::type
57 queue(MemoryAttachementMessage&& msg) {
58 if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag())) {
62 using Message =
typename std::decay<MemoryAttachementMessage>::type;
63 if (hmbdc_unlikely(
sizeof(Message) > maxMessageSize_
66 HMBDC_THROW(std::out_of_range
67 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
71 size_t n = (msg.hasMemoryAttachment::len + maxMemorySegPayloadSize_ - 1ul) / maxMemorySegPayloadSize_ + 2;
72 if (hmbdc_unlikely(n > buffer_.capacity())) {
73 HMBDC_THROW(std::out_of_range
74 ,
"send engine buffer too small capacity=" << buffer_.capacity() <<
"<" << n);
76 auto it = buffer_.claim(n);
77 queueMemorySegTrain(it, n, forward<MemoryAttachementMessage>(msg));
78 buffer_.commit(it, n);
82 template <MessageC Message>
83 typename std::enable_if<!std::is_base_of<hasMemoryAttachment, typename std::decay<Message>::type>::value,
void>::type
84 queue(Message&& msg) {
85 if (!outboundSubscriptions_.check(msg.getTypeTag()))
return;
87 auto it = buffer_.claim(n);
88 queue(it, forward<Message>(msg));
89 buffer_.commit(it, n);
92 template <MessageC Message>
93 typename std::enable_if<!std::is_base_of<hasMemoryAttachment, typename std::decay<Message>::type>::value,
bool>::type
94 tryQueueMessages(Message&& msg) {
95 if (!outboundSubscriptions_.check(msg.getTypeTag()))
return true;
97 auto it = buffer_.tryClaim(n);
99 queue(it, forward<Message>(msg));
100 buffer_.commit(it, n);
106 template <
typename Message,
typename ... Args>
107 void queueInPlace(Args&&... args) {
108 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
109 auto s = buffer_.claim();
110 char* addr =
static_cast<char*
>(*s);
112 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
117 HMBDC_THROW(std::out_of_range
118 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
123 void queueJustBytes(uint16_t tag,
void const* bytes,
size_t len
126 if (!outboundSubscriptions_.check(tag))
return;
127 auto it = buffer_.claim();
129 char* addr =
static_cast<char*
>(s);
131 if (hmbdc_likely(len <= maxMessageSize_)) {
136 HMBDC_THROW(std::out_of_range
137 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
141 if (!minRecvToStart_ && !outboundSubscriptions_.check(tag)) {
145 if (hmbdc_unlikely(len > maxMessageSize_
148 HMBDC_THROW(std::out_of_range
149 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
153 size_t n = (att->len + maxMemorySegPayloadSize_ - 1ul) / maxMemorySegPayloadSize_ + 2;
154 if (hmbdc_unlikely(n > buffer_.capacity())) {
155 HMBDC_THROW(std::out_of_range
156 ,
"send engine buffer too small capacity=" << buffer_.capacity() <<
"<" << n);
158 auto it = buffer_.claim(n);
159 queueMemorySegTrain(it, n, tag, *att, len);
160 buffer_.commit(it, n);
165 utils::EpollTask::instance().poll();
166 if (hmbdc_unlikely(minRecvToStart_ != std::numeric_limits<size_t>::max()
167 && asyncBackupSendServer_.readySessionCount() >= minRecvToStart_)) {
168 minRecvToStart_ = std::numeric_limits<size_t>::max();
169 mcSendTransport_.startSend();
171 asyncBackupSendServer_.runOnce();
172 if (advertisedTypeTagsDirty_) {
173 std::unique_lock<std::mutex> g(advertisedTypeTags_.lock, std::try_to_lock);
175 mcSendTransport_.setAds(asyncBackupSendServer_.advertisingMessages());
176 advertisedTypeTagsDirty_ =
false;
179 mcSendTransport_.runOnce(asyncBackupSendServer_.sessionCount());
185 size_t sessionsRemainingActive()
const {
186 return asyncBackupSendServer_.readySessionCount();
189 template <MessageTupleC Messages,
typename Node>
190 void advertiseFor(
Node const& node, uint16_t mod, uint16_t res) {
191 std::scoped_lock<std::mutex> g(advertisedTypeTags_.lock);
192 advertisedTypeTags_.addPubsFor<Messages>(node, mod, res);
193 asyncBackupSendServer_.advertisingMessages(advertisedTypeTags_);
194 advertisedTypeTagsDirty_ =
true;
198 size_t maxMessageSize_;
204 size_t minRecvToStart_;
206 HMBDC_SEQ_TYPE lastBackupSeq_;
208 uint16_t maxMemorySegPayloadSize_;
209 bool waitForSlowReceivers_;
210 reliable::ToCleanupAttQueue toCleanupAttQueue_;
214 std::atomic<bool> advertisedTypeTagsDirty_ =
false;
219 outBufferSizePower2();
222 void queueMemorySegTrain(
typename Buffer::iterator it,
size_t n, M&& m) {
223 using Message =
typename std::decay<M>::type;
226 char* addr =
static_cast<char*
>(s);
229 ->
template get<StartMemorySegTrain>();
231 trainHead.inbandUnderlyingTypeTag = m.getTypeTag();
233 trainHead.segCount = n - 2;
235 h->setSeq(it++.seq_);
239 auto totalLen = (size_t)m.hasMemoryAttachment::len;
240 char* base = (
char*)m.hasMemoryAttachment::attachment;
241 decltype(totalLen) offset = 0;
242 while(totalLen > offset) {
244 char* addr =
static_cast<char*
>(s);
247 ->
template get<MemorySeg>();
248 seg.seg = base + offset;
249 seg.inbandUnderlyingTypeTag = m.getTypeTag();
250 auto l = (uint16_t)std::min((
size_t)maxMemorySegPayloadSize_, totalLen - offset);
253 h->setSeq(it++.seq_);
261 char* addr =
static_cast<char*
>(s);
265 h->flag = app::hasMemoryAttachment::flag;
266 h->setSeq(it++.seq_);
270 void queueMemorySegTrain(
typename Buffer::iterator it,
size_t n
273 template<
typename M,
typename ... Messages>
274 void queue(
typename Buffer::iterator it
275 , M&& m, Messages&&... msgs) {
276 using Message =
typename std::decay<M>::type;
277 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
279 char* addr =
static_cast<char*
>(s);
281 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
286 HMBDC_THROW(std::out_of_range
287 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
289 if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
290 h->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
292 queue(++it, forward<Messages>(msgs)...);
295 void queue(Buffer::iterator it) {}
302 ,
Client<SendTransportEngine> {
306 if (runLock_.try_lock()) {
307 this->checkTimers(time::SysTime::now());
330 using EngineTransport::hmbdcName;
332 tuple<char const*, int> schedSpec()
const {
333 return make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
337 this->runLock_.unlock();
345 using SendTransport = sendtransportengine_detail::SendTransport;
346 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
349 namespace hmbdc {
namespace tips {
namespace rmcast {
351 namespace sendtransportengine_detail {
360 ,
size_t maxMessageSize)
362 , cfg.resetSection(
"tx", false)))
363 , maxMessageSize_(maxMessageSize)
365 , outBufferSizePower2())
367 , config_.getExt<size_t>(
"sendBytesPerSec")
368 , config_.getExt<size_t>(
"sendBytesBurst")
369 , config_.getExt<size_t>(
"sendBytesBurst") != 0ul)
370 , mcSendTransport_(config_, maxMessageSize, buffer_, rater_, toCleanupAttQueue_)
371 , minRecvToStart_(config_.getExt<size_t>(
"minRecvToStart"))
372 , typeTagAdTimer_(
Duration::seconds(config_.getExt<uint32_t>(
"typeTagAdvertisePeriodSeconds")))
373 , lastBackupSeq_(numeric_limits<HMBDC_SEQ_TYPE>::max())
374 , flushTimer_(
Duration::microseconds(config_.getExt<uint32_t>(
"netRoundtripLatencyMicrosec")))
378 , waitForSlowReceivers_(config_.getExt<bool>(
"waitForSlowReceivers"))
379 , asyncBackupSendServer_(config_, buffer_, rater_, toCleanupAttQueue_, outboundSubscriptions_)
381 if (maxMessageSize_ > mtu_) {
382 HMBDC_THROW(std::out_of_range,
"mtu needs to >= " << maxMessageSize_);
385 if (maxMessageSize_ > TransportMessageHeader::maxPayloadSize()) {
386 HMBDC_THROW(std::out_of_range
387 ,
"maxMessageSize_ needs to <=" << TransportMessageHeader::maxPayloadSize());
389 auto sendBytesBurst = config_.
getExt<
size_t>(
"sendBytesBurst");
391 if (sendBytesBurst && sendBytesBurst < sendBytesBurstMin) {
392 HMBDC_THROW(std::out_of_range,
"sendBytesBurst needs to >= " << sendBytesBurstMin);
395 toCleanupAttQueue_.set_capacity(buffer_.capacity() + 10);
396 typeTagAdTimer_.setCallback(
398 mcSendTransport_.setAdPending();
399 if (!waitForSlowReceivers_) cullSlow();
402 schedule(SysTime::now(), typeTagAdTimer_);
404 flushTimer_.setCallback(
406 mcSendTransport_.setSeqAlertPending();
409 schedule(SysTime::now(), flushTimer_);
416 asyncBackupSendServer_.stop();
425 outBufferSizePower2() {
426 auto res = config_.
getExt<uint16_t>(
"outBufferSizePower2");
430 res =hmbdc::numeric::log2Upper(128ul * 1024ul / (8ul + maxMessageSize_));
431 HMBDC_LOG_N(
"auto set --outBufferSizePower2=", res);
438 queueMemorySegTrain(
typename Buffer::iterator it,
size_t n
442 char* addr =
static_cast<char*
>(s);
443 auto h =
new (addr) TransportMessageHeader;
445 ->
template get<StartMemorySegTrain>();
446 trainHead.inbandUnderlyingTypeTag = tag;
447 trainHead.segCount = n - 2;
450 h->setSeq(it++.seq_);
454 auto totalLen = (size_t)m.len;
455 char* base = (
char*)m.attachment;
456 decltype(totalLen) offset = 0;
457 while(totalLen > offset) {
459 char* addr =
static_cast<char*
>(s);
460 auto h =
new (addr) TransportMessageHeader;
462 ->
template get<MemorySeg>();
463 seg.seg = base + offset;
464 auto l = (uint16_t)min((
size_t)maxMemorySegPayloadSize_, totalLen - offset);
466 seg.inbandUnderlyingTypeTag = tag;
468 h->setSeq(it++.seq_);
476 char* addr =
static_cast<char*
>(s);
477 auto h =
new (addr) TransportMessageHeader;
481 h->flag = app::hasMemoryAttachment::flag;
482 h->setSeq(it++.seq_);
490 if (hmbdc_likely(minRecvToStart_ == std::numeric_limits<size_t>::max())) {
491 auto seq = buffer_.readSeq(1);
492 if (seq == lastBackupSeq_ && buffer_.isFull()) {
493 asyncBackupSendServer_.killSlowestSession();
495 lastBackupSeq_ = seq;
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
class to hold an hmbdc configuration
Definition: Config.hpp:45
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:320
Definition: TypedString.hpp:84
Definition: SendTransportEngine.hpp:300
Definition: Timers.hpp:70
Definition: McSendTransport.hpp:29
void messageDispatchingStartedCb(size_t const *) override
called before any messages got dispatched - only once
Definition: SendTransportEngine.hpp:314
Definition: Transport.hpp:42
Definition: BlockingBuffer.hpp:11
Definition: Message.hpp:212
Definition: BackupSendServerT.hpp:75
Definition: Message.hpp:405
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:325
void schedule(SysTime fireAt, Timer &timer)
schedule the timer to start at a specific time
Definition: Timers.hpp:79
Definition: Message.hpp:263
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
SendTransport(Config cfg, size_t maxMessageSize)
ctor
Definition: SendTransportEngine.hpp:359
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: Message.hpp:418
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
capture the transportation mechanism
Definition: SendTransportEngine.hpp:35
Definition: Timers.hpp:117