1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/tips/rnetmap/Transport.hpp" 4 #include "hmbdc/tips/rnetmap/BackupSendServer.hpp" 5 #include "hmbdc/tips/rnetmap/Messages.hpp" 6 #include "hmbdc/tips/rnetmap/NmSendTransport.hpp" 7 #include "hmbdc/tips/rnetmap/DefaultUserConfig.hpp" 8 #include "hmbdc/app/Base.hpp" 9 #include "hmbdc/MetaUtils.hpp" 10 #include "hmbdc/time/Time.hpp" 11 #include "hmbdc/time/Rater.hpp" 12 #include "hmbdc/numeric/BitMath.hpp" 17 #include <type_traits> 20 namespace hmbdc {
namespace tips {
namespace rnetmap {
22 namespace sendtransportengine_detail{
23 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
34 size_t bufferedMessageCount()
const {
35 return buffer_.remainingSize();
38 size_t subscribingPartyDetectedCount(uint16_t tag)
const {
39 return outboundSubscriptions_.check(tag);
42 template <
typename Message>
44 ,
typename std::decay<Message>::type>::value,
void>::type
45 queue(Message&& msg) {
46 if (!outboundSubscriptions_.check(msg.getTypeTag()))
return;
48 auto it = buffer_.claim(n);
49 queue(it, std::forward<Message>(msg));
50 buffer_.commit(it, n);
53 template <
typename MemoryAttachementMessage>
55 ,
typename std::decay<MemoryAttachementMessage>::type>::value,
void>::type
56 queue(MemoryAttachementMessage&& msg) {
57 if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag())) {
61 using Message =
typename decay<MemoryAttachementMessage>::type;
62 if (hmbdc_unlikely(
sizeof(Message) > maxMessageSize_
65 HMBDC_THROW(std::out_of_range
66 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
70 size_t n = (msg.hasMemoryAttachment::len + maxMemorySegPayloadSize_ - 1ul) / maxMemorySegPayloadSize_ + 2;
71 if (hmbdc_unlikely(n > buffer_.capacity())) {
72 HMBDC_THROW(std::out_of_range
73 ,
"send engine buffer too small capacity=" << buffer_.capacity() <<
"<" << n);
75 auto it = buffer_.claim(n);
76 queueMemorySegTrain(it, n, std::forward<MemoryAttachementMessage>(msg));
77 buffer_.commit(it, n);
80 template <
typename Message>
82 ,
typename std::decay<Message>::type>::value,
bool>::type
83 tryQueue(Message&& msg) HMBDC_RESTRICT {
84 if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag()))
return true;
86 auto it = buffer_.tryClaim(n);
88 queue(it, std::forward<Message>(msg));
89 buffer_.commit(it, n);
95 template <
typename Message,
typename ... Args>
96 void queueInPlace(Args&&... args) HMBDC_RESTRICT {
97 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
98 if (hmbdc_unlikely(
sizeof(Message) > maxMessageSize_)) {
99 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
101 auto s = buffer_.claim();
102 TransportMessageHeader::copyToInPlace<Message>(*s, std::forward<Args>(args)...)->setSeq(s.seq_);
106 void queueJustBytes(uint16_t tag,
void const* bytes,
size_t len
109 if (!minRecvToStart_ && !outboundSubscriptions_.check(tag))
return;
110 auto it = buffer_.claim();
112 char* addr =
static_cast<char*
>(s);
114 if (hmbdc_likely(len <= maxMessageSize_)) {
119 HMBDC_THROW(std::out_of_range
120 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
124 if (!minRecvToStart_ && !outboundSubscriptions_.check(tag)) {
128 if (hmbdc_unlikely(len > maxMessageSize_
131 HMBDC_THROW(std::out_of_range
132 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
136 size_t n = (att->len + maxMemorySegPayloadSize_ - 1ul) / maxMemorySegPayloadSize_ + 2;
137 if (hmbdc_unlikely(n > buffer_.capacity())) {
138 HMBDC_THROW(std::out_of_range
139 ,
"send engine buffer too small capacity=" << buffer_.capacity() <<
"<" << n);
141 auto it = buffer_.claim(n);
142 queueMemorySegTrain(it, n, tag, *att, len);
143 buffer_.commit(it, n);
148 utils::EpollTask::instance().poll();
149 if (hmbdc_unlikely(minRecvToStart_ != std::numeric_limits<size_t>::max()
150 && asyncBackupSendServer_.readySessionCount() >= minRecvToStart_)) {
151 minRecvToStart_ = std::numeric_limits<size_t>::max();
152 nmSendTransport_.startSend();
154 asyncBackupSendServer_.runOnce();
155 if (advertisedTypeTagsDirty_) {
156 std::unique_lock<std::mutex> g(advertisedTypeTags_.lock, std::try_to_lock);
158 nmSendTransport_.setAds(asyncBackupSendServer_.advertisingMessages());
159 advertisedTypeTagsDirty_ =
false;
162 nmSendTransport_.runOnce(asyncBackupSendServer_.sessionCount());
166 size_t sessionsRemainingActive()
const {
167 return asyncBackupSendServer_.readySessionCount();
170 template <MessageTupleC Messages,
typename Node>
171 void advertiseFor(
Node const& node, uint16_t mod, uint16_t res) {
172 std::scoped_lock<std::mutex> g(advertisedTypeTags_.lock);
173 advertisedTypeTags_.addPubsFor<Messages>(node, mod, res);
174 asyncBackupSendServer_.advertisingMessages(advertisedTypeTags_);
175 advertisedTypeTagsDirty_ =
true;
179 size_t maxMessageSize_;
185 size_t minRecvToStart_;
187 HMBDC_SEQ_TYPE lastBackupSeq_;
189 bool waitForSlowReceivers_;
190 uint16_t maxMemorySegPayloadSize_;
192 reliable::ToCleanupAttQueue toCleanupAttQueue_;
196 std::atomic<bool> advertisedTypeTagsDirty_ =
false;
200 outBufferSizePower2();
202 void queueMemorySegTrain(
typename Buffer::iterator it,
size_t n
206 void queueMemorySegTrain(
typename Buffer::iterator it,
size_t n, M&& m) {
207 using Message =
typename decay<M>::type;
210 char* addr =
static_cast<char*
>(s);
213 ->
template get<StartMemorySegTrain>();
215 trainHead.inbandUnderlyingTypeTag = m.getTypeTag();
217 trainHead.segCount = n - 2;
219 h->setSeq(it++.seq_);
223 auto totalLen = (size_t)m.hasMemoryAttachment::len;
224 char* base = (
char*)m.hasMemoryAttachment::attachment;
225 decltype(totalLen) offset = 0;
226 while(totalLen > offset) {
228 char* addr =
static_cast<char*
>(s);
231 ->
template get<MemorySeg>();
232 seg.seg = base + offset;
233 auto l = (uint16_t)std::min((
size_t)maxMemorySegPayloadSize_, totalLen - offset);
236 h->setSeq(it++.seq_);
244 char* addr =
static_cast<char*
>(s);
248 h->flag = app::hasMemoryAttachment::flag;
249 h->setSeq(it++.seq_);
254 template<
typename M,
typename ... Messages>
255 void queue(
typename Buffer::iterator it
256 , M&& m, Messages&&... msgs) {
257 using Message =
typename decay<M>::type;
258 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
260 char* addr =
static_cast<char*
>(s);
262 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
267 HMBDC_THROW(std::out_of_range
268 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
270 if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
271 h->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
273 queue(++it, std::forward<Messages>(msgs)...);
276 void queue(Buffer::iterator it) {}
282 ,
Client<SendTransportEngine> {
284 using SendTransport::SendTransport;
293 this->checkTimers(time::SysTime::now());
304 HMBDC_LOG_C(e.what());
313 using EngineTransport::hmbdcName;
315 tuple<char const*, int> schedSpec()
const {
316 return make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
321 using SendTransport = sendtransportengine_detail::SendTransport;
322 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
325 namespace hmbdc {
namespace tips {
namespace rnetmap {
327 namespace sendtransportengine_detail{
332 SendTransport(
Config cfgIn
333 ,
size_t maxMessageSize)
334 : EngineTransport((cfgIn.setAdditionalFallbackConfig(
Config(DefaultUserConfig))
335 , cfgIn.resetSection(
"tx", false)))
336 , maxMessageSize_(maxMessageSize)
337 , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(
MessageHead)
338 , outBufferSizePower2())
340 , config_.getExt<size_t>(
"sendBytesPerSec")
341 , config_.getExt<size_t>(
"sendBytesBurst")
342 , config_.getExt<size_t>(
"sendBytesBurst") != 0ul)
343 , nmSendTransport_(config_, maxMessageSize, buffer_, rater_, toCleanupAttQueue_)
344 , minRecvToStart_(config_.getExt<size_t>(
"minRecvToStart"))
345 , typeTagAdTimer_(
Duration::seconds(config_.getExt<uint32_t>(
"typeTagAdvertisePeriodSeconds")))
346 , lastBackupSeq_(
std::numeric_limits<HMBDC_SEQ_TYPE>::max())
347 , flushTimer_(
Duration::microseconds(config_.getExt<uint32_t>(
"netRoundtripLatencyMicrosec")))
348 , waitForSlowReceivers_(config_.getExt<bool>(
"waitForSlowReceivers"))
349 , maxMemorySegPayloadSize_(
std::min(mtu_ - sizeof(TransportMessageHeader) - sizeof(
MessageHead)
350 , TransportMessageHeader::maxPayloadSize() - sizeof(
MessageHead)))
351 , asyncBackupSendServer_(config_, buffer_, rater_, toCleanupAttQueue_, outboundSubscriptions_)
353 auto sendBytesBurst = config_.
getExt<
size_t>(
"sendBytesBurst");
354 auto sendBytesBurstMin = maxMessageSize +
sizeof(TransportMessageHeader);
355 if (sendBytesBurst && sendBytesBurst < sendBytesBurstMin) {
356 HMBDC_THROW(std::out_of_range,
"sendBytesBurst needs to >= " << sendBytesBurstMin);
358 toCleanupAttQueue_.set_capacity(buffer_.capacity() + 10);
360 typeTagAdTimer_.setCallback(
362 nmSendTransport_.setAdPending();
363 if (!waitForSlowReceivers_) cullSlow();
366 schedule(SysTime::now(), typeTagAdTimer_);
368 flushTimer_.setCallback(
370 nmSendTransport_.setSeqAlertPending();
373 schedule(SysTime::now(), flushTimer_);
379 queueMemorySegTrain(
typename Buffer::iterator it,
size_t n
383 char* addr =
static_cast<char*
>(s);
384 auto h =
new (addr) TransportMessageHeader;
386 ->
template get<StartMemorySegTrain>();
387 trainHead.inbandUnderlyingTypeTag = tag;
388 trainHead.segCount = n - 2;
391 h->setSeq(it++.seq_);
395 auto totalLen = (size_t)m.len;
396 char* base = (
char*)m.attachment;
397 decltype(totalLen) offset = 0;
398 while(totalLen > offset) {
400 char* addr =
static_cast<char*
>(s);
401 auto h =
new (addr) TransportMessageHeader;
403 ->
template get<MemorySeg>();
404 seg.seg = base + offset;
405 auto l = (uint16_t)std::min((
size_t)maxMemorySegPayloadSize_, totalLen - offset);
407 seg.inbandUnderlyingTypeTag = tag;
409 h->setSeq(it++.seq_);
417 char* addr =
static_cast<char*
>(s);
418 auto h =
new (addr) TransportMessageHeader;
422 h->flag = app::hasMemoryAttachment::flag;
423 h->setSeq(it++.seq_);
431 asyncBackupSendServer_.stop();
440 outBufferSizePower2() {
441 auto res = config_.
getExt<uint16_t>(
"outBufferSizePower2");
445 res =hmbdc::numeric::log2Upper(512ul * 1024ul / (8ul + maxMessageSize_));
446 HMBDC_LOG_N(
"auto set --outBufferSizePower2=", res);
454 if (hmbdc_likely(minRecvToStart_ == std::numeric_limits<size_t>::max())) {
455 auto seq = buffer_.readSeq(1);
456 if (seq == lastBackupSeq_ && buffer_.isFull()) {
457 asyncBackupSendServer_.killSlowestSession();
459 lastBackupSeq_ = seq;
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
class to hold an hmbdc configuration
Definition: Config.hpp:45
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: SendTransportEngine.hpp:303
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:308
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:298
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
Definition: SendTransportEngine.hpp:29
Definition: SendTransportEngine.hpp:280
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:292
Definition: BlockingBuffer.hpp:11
Definition: Transport.hpp:39
Definition: NmSendTransport.hpp:45
Definition: Message.hpp:212
Definition: BackupSendServerT.hpp:75
Definition: Message.hpp:405
void schedule(SysTime fireAt, Timer &timer)
schedule the timer to start at a specific time
Definition: Timers.hpp:79
Definition: Message.hpp:263
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: Message.hpp:418
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Timers.hpp:117