hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Client.hpp"
4 #include "hmbdc/app/Logger.hpp"
5 #include "hmbdc/tips/rmcast/Transport.hpp"
6 #include "hmbdc/tips/rmcast/BackupSendServer.hpp"
7 #include "hmbdc/tips/rmcast/Messages.hpp"
8 #include "hmbdc/tips/rmcast/McSendTransport.hpp"
9 #include "hmbdc/tips/rmcast/DefaultUserConfig.hpp"
10 
11 #include "hmbdc/time/Time.hpp"
12 #include "hmbdc/time/Rater.hpp"
13 #include "hmbdc/numeric/BitMath.hpp"
14 #include "hmbdc/MetaUtils.hpp"
15 
16 #include <algorithm>
17 #include <memory>
18 #include <tuple>
19 #include <mutex>
20 #include <type_traits>
21 
22 namespace hmbdc { namespace tips { namespace rmcast {
23 
24 namespace sendtransportengine_detail {
25 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
26 
27 using namespace hmbdc::app;
28 using namespace hmbdc::time;
29 using namespace hmbdc::pattern;
30 
31 /**
32  * @brief capture the transportation mechanism
33  *
34  */
37 , TimerManager {
38  /**
39  * @brief ctor
40  * @param cfg jason specifing the transport - see example, perf-rmcast.cpp
41  * @param maxMessageSize max messafe size in bytes to be sent
42  * hold the message in buffer until buffer is full to start sending
43  */
44  SendTransport(Config cfg, size_t maxMessageSize);
45 
46  size_t bufferedMessageCount() const {
47  return buffer_.remainingSize();
48  }
49 
50  size_t subscribingPartyDetectedCount(uint16_t tag) const {
51  return outboundSubscriptions_.check(tag);
52  }
53 
54  template <typename MemoryAttachementMessage>
55  typename std::enable_if<std::is_base_of<hasMemoryAttachment
56  , typename std::decay<MemoryAttachementMessage>::type>::value, void>::type
57  queue(MemoryAttachementMessage&& msg) {
58  if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag())) {
59  msg.release();
60  return;
61  }
62  using Message = typename std::decay<MemoryAttachementMessage>::type;
63  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_
64  || sizeof(MemorySeg) > maxMessageSize_
65  || sizeof(StartMemorySegTrain) > maxMessageSize_)) {
66  HMBDC_THROW(std::out_of_range
67  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
68  }
69 
70  // + train header and actual msg
71  size_t n = (msg.hasMemoryAttachment::len + maxMemorySegPayloadSize_ - 1ul) / maxMemorySegPayloadSize_ + 2;
72  if (hmbdc_unlikely(n > buffer_.capacity())) {
73  HMBDC_THROW(std::out_of_range
74  , "send engine buffer too small capacity=" << buffer_.capacity() << "<" << n);
75  }
76  auto it = buffer_.claim(n);
77  queueMemorySegTrain(it, n, forward<MemoryAttachementMessage>(msg));
78  buffer_.commit(it, n);
79  }
80 
81 
82  template <MessageC Message>
83  typename std::enable_if<!std::is_base_of<hasMemoryAttachment, typename std::decay<Message>::type>::value, void>::type
84  queue(Message&& msg) {
85  if (!outboundSubscriptions_.check(msg.getTypeTag())) return;
86  auto n = 1;
87  auto it = buffer_.claim(n);
88  queue(it, forward<Message>(msg));
89  buffer_.commit(it, n);
90  }
91 
92  template <MessageC Message>
93  typename std::enable_if<!std::is_base_of<hasMemoryAttachment, typename std::decay<Message>::type>::value, bool>::type
94  tryQueueMessages(Message&& msg) {
95  if (!outboundSubscriptions_.check(msg.getTypeTag())) return true;
96  auto n = 1;
97  auto it = buffer_.tryClaim(n);
98  if (it) {
99  queue(it, forward<Message>(msg));
100  buffer_.commit(it, n);
101  return true;
102  }
103  return false;
104  }
105 
106  template <typename Message, typename ... Args>
107  void queueInPlace(Args&&... args) {
108  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
109  auto s = buffer_.claim();
110  char* addr = static_cast<char*>(*s);
111  auto h = new (addr) TransportMessageHeader;
112  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
113  new (addr + sizeof(TransportMessageHeader)) MessageWrap<Message>(forward<Args>(args)...);
114  h->messagePayloadLen = sizeof(MessageWrap<Message>);
115  h->setSeq(s.seq_);
116  } else {
117  HMBDC_THROW(std::out_of_range
118  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
119  }
120  buffer_.commit(s);
121  }
122 
123  void queueJustBytes(uint16_t tag, void const* bytes, size_t len
124  , app::hasMemoryAttachment* att) {
125  if (!att) {
126  if (!outboundSubscriptions_.check(tag)) return;
127  auto it = buffer_.claim();
128  auto s = *it;
129  char* addr = static_cast<char*>(s);
130  auto h = new (addr) TransportMessageHeader;
131  if (hmbdc_likely(len <= maxMessageSize_)) {
132  new (addr + sizeof(TransportMessageHeader)) MessageWrap<app::JustBytes>(tag, bytes, len, att);
133  h->messagePayloadLen = sizeof(MessageHead) + len;
134  h->setSeq(it.seq_);
135  } else {
136  HMBDC_THROW(std::out_of_range
137  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
138  }
139  buffer_.commit(it);
140  } else {
141  if (!minRecvToStart_ && !outboundSubscriptions_.check(tag)) {
142  att->release();
143  return;
144  }
145  if (hmbdc_unlikely(len > maxMessageSize_
146  || sizeof(MemorySeg) > maxMessageSize_
147  || sizeof(StartMemorySegTrain) > maxMessageSize_)) {
148  HMBDC_THROW(std::out_of_range
149  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
150  }
151 
152  // + train header and actual msg
153  size_t n = (att->len + maxMemorySegPayloadSize_ - 1ul) / maxMemorySegPayloadSize_ + 2;
154  if (hmbdc_unlikely(n > buffer_.capacity())) {
155  HMBDC_THROW(std::out_of_range
156  , "send engine buffer too small capacity=" << buffer_.capacity() << "<" << n);
157  }
158  auto it = buffer_.claim(n);
159  queueMemorySegTrain(it, n, tag, *att, len);
160  buffer_.commit(it, n);
161  }
162  }
163 
164  void runOnce() {
165  utils::EpollTask::instance().poll();
166  if (hmbdc_unlikely(minRecvToStart_ != std::numeric_limits<size_t>::max()
167  && asyncBackupSendServer_.readySessionCount() >= minRecvToStart_)) {
168  minRecvToStart_ = std::numeric_limits<size_t>::max(); //now started no going back
169  mcSendTransport_.startSend();
170  }
171  asyncBackupSendServer_.runOnce();
172  if (advertisedTypeTagsDirty_) {
173  std::unique_lock<std::mutex> g(advertisedTypeTags_.lock, std::try_to_lock);
174  if (g.owns_lock()) {
175  mcSendTransport_.setAds(asyncBackupSendServer_.advertisingMessages());
176  advertisedTypeTagsDirty_ = false;
177  }
178  }
179  mcSendTransport_.runOnce(asyncBackupSendServer_.sessionCount());
180  }
181 
182  void stop();
183 
184 
185  size_t sessionsRemainingActive() const {
186  return asyncBackupSendServer_.readySessionCount();
187  }
188 
189  template <MessageTupleC Messages, typename Node>
190  void advertiseFor(Node const& node, uint16_t mod, uint16_t res) {
191  std::scoped_lock<std::mutex> g(advertisedTypeTags_.lock);
192  advertisedTypeTags_.addPubsFor<Messages>(node, mod, res);
193  asyncBackupSendServer_.advertisingMessages(advertisedTypeTags_);
194  advertisedTypeTagsDirty_ = true;
195  }
196 
197 private:
198  size_t maxMessageSize_;
200  Buffer buffer_;
201 
202  Rater rater_;
203  McSendTransport mcSendTransport_;
204  size_t minRecvToStart_;
205  ReoccuringTimer typeTagAdTimer_;
206  HMBDC_SEQ_TYPE lastBackupSeq_;
207  ReoccuringTimer flushTimer_;
208  uint16_t maxMemorySegPayloadSize_;
209  bool waitForSlowReceivers_;
210  reliable::ToCleanupAttQueue toCleanupAttQueue_;
211  TypeTagSet outboundSubscriptions_;
212  AsyncBackupSendServer asyncBackupSendServer_;
213  bool stopped_;
214  std::atomic<bool> advertisedTypeTagsDirty_ = false;
215  TypeTagSetST advertisedTypeTags_;
216 
217 private:
218  uint16_t
219  outBufferSizePower2();
220 
221  template<typename M>
222  void queueMemorySegTrain(typename Buffer::iterator it, size_t n, M&& m) {
223  using Message = typename std::decay<M>::type;
224  //train head
225  auto s = *it;
226  char* addr = static_cast<char*>(s);
227  auto h = new (addr) TransportMessageHeader;
228  auto& trainHead = (new (addr + sizeof(TransportMessageHeader)) MessageWrap<StartMemorySegTrain>)
229  ->template get<StartMemorySegTrain>();
230 
231  trainHead.inbandUnderlyingTypeTag = m.getTypeTag();
232  trainHead.att = m;
233  trainHead.segCount = n - 2;
234  h->messagePayloadLen = sizeof(MessageWrap<StartMemorySegTrain>);
235  h->setSeq(it++.seq_);
236 
237  //train body
238  {
239  auto totalLen = (size_t)m.hasMemoryAttachment::len;
240  char* base = (char*)m.hasMemoryAttachment::attachment;
241  decltype(totalLen) offset = 0;
242  while(totalLen > offset) {
243  auto s = *it;
244  char* addr = static_cast<char*>(s);
245  auto h = new (addr) TransportMessageHeader;
246  auto& seg = (new (addr + sizeof(TransportMessageHeader)) MessageWrap<MemorySeg>)
247  ->template get<MemorySeg>();
248  seg.seg = base + offset;
249  seg.inbandUnderlyingTypeTag = m.getTypeTag();
250  auto l = (uint16_t)std::min((size_t)maxMemorySegPayloadSize_, totalLen - offset);
251  seg.len = l;
252  h->messagePayloadLen = sizeof(MessageHead) + l; //skipped bytes
253  h->setSeq(it++.seq_);
254  offset += l;
255  }
256  }
257 
258  //actual message
259  {
260  auto s = *it;
261  char* addr = static_cast<char*>(s);
262  auto h = new (addr) TransportMessageHeader;
263  new (addr + sizeof(TransportMessageHeader)) MessageWrap<Message>(std::forward<M>(m));
264  h->messagePayloadLen = sizeof(MessageWrap<Message>);
265  h->flag = app::hasMemoryAttachment::flag;
266  h->setSeq(it++.seq_);
267  }
268  }
269 
270  void queueMemorySegTrain(typename Buffer::iterator it, size_t n
271  , uint16_t tag, hasMemoryAttachment const& m, size_t len);
272 
273  template<typename M, typename ... Messages>
274  void queue(typename Buffer::iterator it
275  , M&& m, Messages&&... msgs) {
276  using Message = typename std::decay<M>::type;
277  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
278  auto s = *it;
279  char* addr = static_cast<char*>(s);
280  auto h = new (addr) TransportMessageHeader;
281  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
282  new (addr + sizeof(TransportMessageHeader)) MessageWrap<Message>(std::forward<M>(m));
283  h->messagePayloadLen = sizeof(MessageWrap<Message>);
284  h->setSeq(it.seq_);
285  } else {
286  HMBDC_THROW(std::out_of_range
287  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
288  }
289  if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
290  h->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
291  }
292  queue(++it, forward<Messages>(msgs)...);
293  }
294 
295  void queue(Buffer::iterator it) {}
296 
297  void cullSlow();
298 };
299 
302 , Client<SendTransportEngine> {
304 
305  void rotate() {
306  if (runLock_.try_lock()) {
307  this->checkTimers(time::SysTime::now());
309  runLock_.unlock();
310  }
311  }
312 
313  /*virtual*/
314  void messageDispatchingStartedCb(size_t const*) override {
315  runLock_.lock();
316  }
317 
318 
319  /*virtual*/
320  void invokedCb(size_t) HMBDC_RESTRICT override {
321  this->runOnce();
322  }
323 
324  /*virtual*/
325  bool droppedCb() override {
326  stop();
327  return true;
328  };
329 
330  using EngineTransport::hmbdcName;
331 
332  tuple<char const*, int> schedSpec() const {
333  return make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
334  }
335 
337  this->runLock_.unlock();
338  }
339 
340 private:
341  std::mutex runLock_;
342 };
343 
344 } //sendtransportengine_detail
345 using SendTransport = sendtransportengine_detail::SendTransport;
346 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
347 }}}
348 
349 namespace hmbdc { namespace tips { namespace rmcast {
350 
351 namespace sendtransportengine_detail {
352 using namespace hmbdc::time;
353 
354 using namespace hmbdc::pattern;
355 using namespace std;
356 
357 inline
360  , size_t maxMessageSize)
361 : EngineTransport((cfg.setAdditionalFallbackConfig(Config(DefaultUserConfig))
362  , cfg.resetSection("tx", false)))
363 , maxMessageSize_(maxMessageSize)
364 , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(MessageHead)
365  , outBufferSizePower2())
366 , rater_(Duration::seconds(1u)
367  , config_.getExt<size_t>("sendBytesPerSec")
368  , config_.getExt<size_t>("sendBytesBurst")
369  , config_.getExt<size_t>("sendBytesBurst") != 0ul)
370 , mcSendTransport_(config_, maxMessageSize, buffer_, rater_, toCleanupAttQueue_)
371 , minRecvToStart_(config_.getExt<size_t>("minRecvToStart"))
372 , typeTagAdTimer_(Duration::seconds(config_.getExt<uint32_t>("typeTagAdvertisePeriodSeconds")))
373 , lastBackupSeq_(numeric_limits<HMBDC_SEQ_TYPE>::max())
374 , flushTimer_(Duration::microseconds(config_.getExt<uint32_t>("netRoundtripLatencyMicrosec")))
375 , maxMemorySegPayloadSize_(min(mtu_ - sizeof(TransportMessageHeader) - sizeof(MessageHead)
376  , TransportMessageHeader::maxPayloadSize() - sizeof(MessageHead))
377 )
378 , waitForSlowReceivers_(config_.getExt<bool>("waitForSlowReceivers"))
379 , asyncBackupSendServer_(config_, buffer_, rater_, toCleanupAttQueue_, outboundSubscriptions_)
380 , stopped_(false) {
381  if (maxMessageSize_ > mtu_) {
382  HMBDC_THROW(std::out_of_range, "mtu needs to >= " << maxMessageSize_);
383  }
384 
385  if (maxMessageSize_ > TransportMessageHeader::maxPayloadSize()) {
386  HMBDC_THROW(std::out_of_range
387  , "maxMessageSize_ needs to <=" << TransportMessageHeader::maxPayloadSize());
388  }
389  auto sendBytesBurst = config_.getExt<size_t>("sendBytesBurst");
390  auto sendBytesBurstMin = maxMessageSize + sizeof(TransportMessageHeader);
391  if (sendBytesBurst && sendBytesBurst < sendBytesBurstMin) {
392  HMBDC_THROW(std::out_of_range, "sendBytesBurst needs to >= " << sendBytesBurstMin);
393  }
394 
395  toCleanupAttQueue_.set_capacity(buffer_.capacity() + 10);
396  typeTagAdTimer_.setCallback(
397  [this](TimerManager& tm, SysTime const& now) {
398  mcSendTransport_.setAdPending();
399  if (!waitForSlowReceivers_) cullSlow();
400  }
401  );
402  schedule(SysTime::now(), typeTagAdTimer_);
403 
404  flushTimer_.setCallback(
405  [this](TimerManager& tm, SysTime const& now) {
406  mcSendTransport_.setSeqAlertPending();
407  }
408  );
409  schedule(SysTime::now(), flushTimer_);
410 }
411 
412 inline
413 void
414 SendTransport::
415 stop() {
416  asyncBackupSendServer_.stop();
417  buffer_.reset(0);
418  buffer_.reset(1);
419  stopped_ = true;
420 };
421 
422 inline
423 uint16_t
424 SendTransport::
425 outBufferSizePower2() {
426  auto res = config_.getExt<uint16_t>("outBufferSizePower2");
427  if (res) {
428  return res;
429  }
430  res =hmbdc::numeric::log2Upper(128ul * 1024ul / (8ul + maxMessageSize_));
431  HMBDC_LOG_N("auto set --outBufferSizePower2=", res);
432  return res;
433 }
434 
435 inline
436 void
437 SendTransport::
438 queueMemorySegTrain(typename Buffer::iterator it, size_t n
439  , uint16_t tag, hasMemoryAttachment const& m, size_t len) {
440  //train head
441  auto s = *it;
442  char* addr = static_cast<char*>(s);
443  auto h = new (addr) TransportMessageHeader;
444  auto& trainHead = (new (addr + sizeof(TransportMessageHeader)) MessageWrap<StartMemorySegTrain>)
445  ->template get<StartMemorySegTrain>();
446  trainHead.inbandUnderlyingTypeTag = tag;
447  trainHead.segCount = n - 2;
448  trainHead.att = m;
449  h->messagePayloadLen = sizeof(MessageWrap<StartMemorySegTrain>);
450  h->setSeq(it++.seq_);
451 
452  //train body
453  {
454  auto totalLen = (size_t)m.len;
455  char* base = (char*)m.attachment;
456  decltype(totalLen) offset = 0;
457  while(totalLen > offset) {
458  auto s = *it;
459  char* addr = static_cast<char*>(s);
460  auto h = new (addr) TransportMessageHeader;
461  auto& seg = (new (addr + sizeof(TransportMessageHeader)) MessageWrap<MemorySeg>)
462  ->template get<MemorySeg>();
463  seg.seg = base + offset;
464  auto l = (uint16_t)min((size_t)maxMemorySegPayloadSize_, totalLen - offset);
465  seg.len = l;
466  seg.inbandUnderlyingTypeTag = tag;
467  h->messagePayloadLen = sizeof(MessageHead) + l; //skipped bytes
468  h->setSeq(it++.seq_);
469  offset += l;
470  }
471  }
472 
473  //actual message
474  {
475  auto s = *it;
476  char* addr = static_cast<char*>(s);
477  auto h = new (addr) TransportMessageHeader;
478  new (addr + sizeof(TransportMessageHeader)) MessageWrap<app::JustBytes>(
479  tag, &m, len, &m);
480  h->messagePayloadLen = sizeof(MessageHead) + len;
481  h->flag = app::hasMemoryAttachment::flag;
482  h->setSeq(it++.seq_);
483  }
484 }
485 
486 inline
487 void
488 SendTransport::
489 cullSlow() {
490  if (hmbdc_likely(minRecvToStart_ == std::numeric_limits<size_t>::max())) {
491  auto seq = buffer_.readSeq(1);
492  if (seq == lastBackupSeq_ && buffer_.isFull()) {
493  asyncBackupSendServer_.killSlowestSession();
494  }
495  lastBackupSeq_ = seq;
496  }
497 }
498 } //sendtransportengine_detail
499 }}}
500 
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:320
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
void messageDispatchingStartedCb(size_t const *) override
called before any messages got dispatched - only once
Definition: SendTransportEngine.hpp:314
Definition: BlockingBuffer.hpp:11
Definition: Message.hpp:212
Definition: Message.hpp:405
Definition: Time.hpp:14
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:325
void schedule(SysTime fireAt, Timer &timer)
schedule the timer to start at a specific time
Definition: Timers.hpp:79
Definition: TypeTagSet.hpp:141
Definition: Message.hpp:263
Definition: Time.hpp:134
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
Definition: Rater.hpp:10
SendTransport(Config cfg, size_t maxMessageSize)
ctor
Definition: SendTransportEngine.hpp:359
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: TypeTagSet.hpp:201
Definition: Rater.hpp:11
Definition: Message.hpp:418
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Base.hpp:12
capture the transportation mechanism
Definition: SendTransportEngine.hpp:35
Definition: Timers.hpp:117