hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/rnetmap/Transport.hpp"
4 #include "hmbdc/tips/rnetmap/BackupSendServer.hpp"
5 #include "hmbdc/tips/rnetmap/Messages.hpp"
6 #include "hmbdc/tips/rnetmap/NmSendTransport.hpp"
7 #include "hmbdc/tips/rnetmap/DefaultUserConfig.hpp"
8 #include "hmbdc/app/Base.hpp"
9 #include "hmbdc/MetaUtils.hpp"
10 #include "hmbdc/time/Time.hpp"
11 #include "hmbdc/time/Rater.hpp"
12 #include "hmbdc/numeric/BitMath.hpp"
13 
14 #include <algorithm>
15 #include <memory>
16 #include <tuple>
17 #include <type_traits>
18 #include <mutex>
19 
20 namespace hmbdc { namespace tips { namespace rnetmap {
21 
22 namespace sendtransportengine_detail{
23 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
24 using namespace hmbdc::time;
25 using namespace hmbdc::app;
26 
27 using namespace hmbdc::pattern;
28 
31 , TimerManager {
32  SendTransport(Config cfg, size_t maxMessageSize);
33 
34  size_t bufferedMessageCount() const {
35  return buffer_.remainingSize();
36  }
37 
38  size_t subscribingPartyDetectedCount(uint16_t tag) const {
39  return outboundSubscriptions_.check(tag);
40  }
41 
42  template <typename Message>
43  typename std::enable_if<!std::is_base_of<hasMemoryAttachment
44  , typename std::decay<Message>::type>::value, void>::type
45  queue(Message&& msg) {
46  if (!outboundSubscriptions_.check(msg.getTypeTag())) return;
47  auto n = 1;
48  auto it = buffer_.claim(n);
49  queue(it, std::forward<Message>(msg));
50  buffer_.commit(it, n);
51  }
52 
53  template <typename MemoryAttachementMessage>
54  typename std::enable_if<std::is_base_of<hasMemoryAttachment
55  , typename std::decay<MemoryAttachementMessage>::type>::value, void>::type
56  queue(MemoryAttachementMessage&& msg) {
57  if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag())) {
58  msg.release();
59  return;
60  }
61  using Message = typename decay<MemoryAttachementMessage>::type;
62  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_
63  || sizeof(MemorySeg) > maxMessageSize_
64  || sizeof(StartMemorySegTrain) > maxMessageSize_)) {
65  HMBDC_THROW(std::out_of_range
66  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
67  }
68 
69  // + train header and actual msg
70  size_t n = (msg.hasMemoryAttachment::len + maxMemorySegPayloadSize_ - 1ul) / maxMemorySegPayloadSize_ + 2;
71  if (hmbdc_unlikely(n > buffer_.capacity())) {
72  HMBDC_THROW(std::out_of_range
73  , "send engine buffer too small capacity=" << buffer_.capacity() << "<" << n);
74  }
75  auto it = buffer_.claim(n);
76  queueMemorySegTrain(it, n, std::forward<MemoryAttachementMessage>(msg));
77  buffer_.commit(it, n);
78  }
79 
80  template <typename Message>
81  typename std::enable_if<!std::is_base_of<hasMemoryAttachment
82  , typename std::decay<Message>::type>::value, bool>::type
83  tryQueue(Message&& msg) HMBDC_RESTRICT {
84  if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag())) return true;
85  auto n = 1;
86  auto it = buffer_.tryClaim(n);
87  if (it) {
88  queue(it, std::forward<Message>(msg));
89  buffer_.commit(it, n);
90  return true;
91  }
92  return false;
93  }
94 
95  template <typename Message, typename ... Args>
96  void queueInPlace(Args&&... args) HMBDC_RESTRICT {
97  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
98  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_)) {
99  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
100  }
101  auto s = buffer_.claim();
102  TransportMessageHeader::copyToInPlace<Message>(*s, std::forward<Args>(args)...)->setSeq(s.seq_);
103  buffer_.commit(s);
104  }
105 
106  void queueJustBytes(uint16_t tag, void const* bytes, size_t len
107  , app::hasMemoryAttachment* att) {
108  if (!att) {
109  if (!minRecvToStart_ && !outboundSubscriptions_.check(tag)) return;
110  auto it = buffer_.claim();
111  auto s = *it;
112  char* addr = static_cast<char*>(s);
113  auto h = new (addr) TransportMessageHeader;
114  if (hmbdc_likely(len <= maxMessageSize_)) {
115  new (addr + sizeof(TransportMessageHeader)) MessageWrap<app::JustBytes>(tag, bytes, len, att);
116  h->messagePayloadLen = sizeof(MessageHead) + len;
117  h->setSeq(it.seq_);
118  } else {
119  HMBDC_THROW(std::out_of_range
120  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
121  }
122  buffer_.commit(it);
123  } else {
124  if (!minRecvToStart_ && !outboundSubscriptions_.check(tag)) {
125  att->release();
126  return;
127  }
128  if (hmbdc_unlikely(len > maxMessageSize_
129  || sizeof(MemorySeg) > maxMessageSize_
130  || sizeof(StartMemorySegTrain) > maxMessageSize_)) {
131  HMBDC_THROW(std::out_of_range
132  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
133  }
134 
135  // + train header and actual msg
136  size_t n = (att->len + maxMemorySegPayloadSize_ - 1ul) / maxMemorySegPayloadSize_ + 2;
137  if (hmbdc_unlikely(n > buffer_.capacity())) {
138  HMBDC_THROW(std::out_of_range
139  , "send engine buffer too small capacity=" << buffer_.capacity() << "<" << n);
140  }
141  auto it = buffer_.claim(n);
142  queueMemorySegTrain(it, n, tag, *att, len);
143  buffer_.commit(it, n);
144  }
145  }
146 
147  void runOnce() {
148  utils::EpollTask::instance().poll();
149  if (hmbdc_unlikely(minRecvToStart_ != std::numeric_limits<size_t>::max()
150  && asyncBackupSendServer_.readySessionCount() >= minRecvToStart_)) {
151  minRecvToStart_ = std::numeric_limits<size_t>::max(); //now started no going back
152  nmSendTransport_.startSend();
153  }
154  asyncBackupSendServer_.runOnce();
155  if (advertisedTypeTagsDirty_) {
156  std::unique_lock<std::mutex> g(advertisedTypeTags_.lock, std::try_to_lock);
157  if (g.owns_lock()) {
158  nmSendTransport_.setAds(asyncBackupSendServer_.advertisingMessages());
159  advertisedTypeTagsDirty_ = false;
160  }
161  }
162  nmSendTransport_.runOnce(asyncBackupSendServer_.sessionCount());
163  }
164 
165  void stop();
166  size_t sessionsRemainingActive() const {
167  return asyncBackupSendServer_.readySessionCount();
168  }
169 
170  template <MessageTupleC Messages, typename Node>
171  void advertiseFor(Node const& node, uint16_t mod, uint16_t res) {
172  std::scoped_lock<std::mutex> g(advertisedTypeTags_.lock);
173  advertisedTypeTags_.addPubsFor<Messages>(node, mod, res);
174  asyncBackupSendServer_.advertisingMessages(advertisedTypeTags_);
175  advertisedTypeTagsDirty_ = true;
176  }
177 
178 private:
179  size_t maxMessageSize_;
181  Buffer buffer_;
182 
183  Rater rater_;
184  NmSendTransport nmSendTransport_;
185  size_t minRecvToStart_;
186  ReoccuringTimer typeTagAdTimer_;
187  HMBDC_SEQ_TYPE lastBackupSeq_;
188  ReoccuringTimer flushTimer_;
189  bool waitForSlowReceivers_;
190  uint16_t maxMemorySegPayloadSize_;
191 
192  reliable::ToCleanupAttQueue toCleanupAttQueue_;
193  TypeTagSet outboundSubscriptions_;
194  AsyncBackupSendServer asyncBackupSendServer_;
195  bool stopped_;
196  std::atomic<bool> advertisedTypeTagsDirty_ = false;
197  TypeTagSetST advertisedTypeTags_;
198 
199  uint16_t
200  outBufferSizePower2();
201 
202  void queueMemorySegTrain(typename Buffer::iterator it, size_t n
203  , uint16_t tag, hasMemoryAttachment const& m, size_t len);
204 
205  template<typename M>
206  void queueMemorySegTrain(typename Buffer::iterator it, size_t n, M&& m) {
207  using Message = typename decay<M>::type;
208  //train head
209  auto s = *it;
210  char* addr = static_cast<char*>(s);
211  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
212  auto& trainHead = (new (addr + sizeof(TransportMessageHeader)) MessageWrap<StartMemorySegTrain>)
213  ->template get<StartMemorySegTrain>();
214 
215  trainHead.inbandUnderlyingTypeTag = m.getTypeTag();
216  trainHead.att = m;
217  trainHead.segCount = n - 2;
218  h->messagePayloadLen = sizeof(MessageWrap<StartMemorySegTrain>);
219  h->setSeq(it++.seq_);
220 
221  //train body
222  {
223  auto totalLen = (size_t)m.hasMemoryAttachment::len;
224  char* base = (char*)m.hasMemoryAttachment::attachment;
225  decltype(totalLen) offset = 0;
226  while(totalLen > offset) {
227  auto s = *it;
228  char* addr = static_cast<char*>(s);
229  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
230  auto& seg = (new (addr + sizeof(TransportMessageHeader)) MessageWrap<MemorySeg>)
231  ->template get<MemorySeg>();
232  seg.seg = base + offset;
233  auto l = (uint16_t)std::min((size_t)maxMemorySegPayloadSize_, totalLen - offset);
234  seg.len = l;
235  h->messagePayloadLen = sizeof(MessageHead) + l; //skipped bytes
236  h->setSeq(it++.seq_);
237  offset += l;
238  }
239  }
240 
241  //actual message
242  {
243  auto s = *it;
244  char* addr = static_cast<char*>(s);
245  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
246  new (addr + sizeof(TransportMessageHeader)) MessageWrap<Message>(std::forward<M>(m));
247  h->messagePayloadLen = sizeof(MessageWrap<Message>);
248  h->flag = app::hasMemoryAttachment::flag;
249  h->setSeq(it++.seq_);
250  }
251  }
252 
253 
254  template<typename M, typename ... Messages>
255  void queue(typename Buffer::iterator it
256  , M&& m, Messages&&... msgs) {
257  using Message = typename decay<M>::type;
258  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
259  auto s = *it;
260  char* addr = static_cast<char*>(s);
261  auto h = new (addr) TransportMessageHeader;
262  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
263  new (addr + sizeof(TransportMessageHeader)) MessageWrap<Message>(std::forward<M>(m));
264  h->messagePayloadLen = sizeof(MessageWrap<Message>);
265  h->setSeq(it.seq_);
266  } else {
267  HMBDC_THROW(std::out_of_range
268  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
269  }
270  if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
271  h->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
272  }
273  queue(++it, std::forward<Messages>(msgs)...);
274  }
275 
276  void queue(Buffer::iterator it) {}
277  void cullSlow();
278 };
279 
282 , Client<SendTransportEngine> {
283 
284  using SendTransport::SendTransport;
285 /**
286  * @brief if the user choose no to have a Context to manage and run the engine
287  * this method can be called from any thread from time to time to have the engine
288  * do its job
289  * @details thread safe and non-blocking, if the engine is already being powered by a Context,
290  * this method has no effect
291  */
292  void rotate() {
293  this->checkTimers(time::SysTime::now());
295  }
296 
297  /*virtual*/
298  void invokedCb(size_t) HMBDC_RESTRICT override {
299  this->runOnce();
300  }
301 
302  /*virtual*/
303  void stoppedCb(std::exception const& e) override {
304  HMBDC_LOG_C(e.what());
305  };
306 
307  /*virtual*/
308  bool droppedCb() override {
309  stop();
310  return true;
311  };
312 
313  using EngineTransport::hmbdcName;
314 
315  tuple<char const*, int> schedSpec() const {
316  return make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
317  }
318 };
319 }// sendtransportengine_detail
320 
321 using SendTransport = sendtransportengine_detail::SendTransport;
322 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
323 }}}
324 
325 namespace hmbdc { namespace tips { namespace rnetmap {
326 
327 namespace sendtransportengine_detail{
328  using namespace hmbdc::app;
329 
330 inline
331 SendTransport::
332 SendTransport(Config cfgIn
333  , size_t maxMessageSize)
334 : EngineTransport((cfgIn.setAdditionalFallbackConfig(Config(DefaultUserConfig))
335  , cfgIn.resetSection("tx", false)))
336 , maxMessageSize_(maxMessageSize)
337 , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(MessageHead)
338  , outBufferSizePower2())
339 , rater_(Duration::seconds(1u)
340  , config_.getExt<size_t>("sendBytesPerSec")
341  , config_.getExt<size_t>("sendBytesBurst")
342  , config_.getExt<size_t>("sendBytesBurst") != 0ul)
343 , nmSendTransport_(config_, maxMessageSize, buffer_, rater_, toCleanupAttQueue_)
344 , minRecvToStart_(config_.getExt<size_t>("minRecvToStart"))
345 , typeTagAdTimer_(Duration::seconds(config_.getExt<uint32_t>("typeTagAdvertisePeriodSeconds")))
346 , lastBackupSeq_(std::numeric_limits<HMBDC_SEQ_TYPE>::max())
347 , flushTimer_(Duration::microseconds(config_.getExt<uint32_t>("netRoundtripLatencyMicrosec")))
348 , waitForSlowReceivers_(config_.getExt<bool>("waitForSlowReceivers"))
349 , maxMemorySegPayloadSize_(std::min(mtu_ - sizeof(TransportMessageHeader) - sizeof(MessageHead)
350  , TransportMessageHeader::maxPayloadSize() - sizeof(MessageHead)))
351 , asyncBackupSendServer_(config_, buffer_, rater_, toCleanupAttQueue_, outboundSubscriptions_)
352 , stopped_(false) {
353  auto sendBytesBurst = config_.getExt<size_t>("sendBytesBurst");
354  auto sendBytesBurstMin = maxMessageSize + sizeof(TransportMessageHeader);
355  if (sendBytesBurst && sendBytesBurst < sendBytesBurstMin) {
356  HMBDC_THROW(std::out_of_range, "sendBytesBurst needs to >= " << sendBytesBurstMin);
357  }
358  toCleanupAttQueue_.set_capacity(buffer_.capacity() + 10);
359 
360  typeTagAdTimer_.setCallback(
361  [this](TimerManager& tm, SysTime const& now) {
362  nmSendTransport_.setAdPending();
363  if (!waitForSlowReceivers_) cullSlow();
364  }
365  );
366  schedule(SysTime::now(), typeTagAdTimer_);
367 
368  flushTimer_.setCallback(
369  [this](TimerManager& tm, SysTime const& now) {
370  nmSendTransport_.setSeqAlertPending();
371  }
372  );
373  schedule(SysTime::now(), flushTimer_);
374 }
375 
376 inline
377 void
378 SendTransport::
379 queueMemorySegTrain(typename Buffer::iterator it, size_t n
380  , uint16_t tag, hasMemoryAttachment const& m, size_t len) {
381  //train head
382  auto s = *it;
383  char* addr = static_cast<char*>(s);
384  auto h = new (addr) TransportMessageHeader;
385  auto& trainHead = (new (addr + sizeof(TransportMessageHeader)) MessageWrap<StartMemorySegTrain>)
386  ->template get<StartMemorySegTrain>();
387  trainHead.inbandUnderlyingTypeTag = tag;
388  trainHead.segCount = n - 2;
389  trainHead.att = m;
390  h->messagePayloadLen = sizeof(MessageWrap<StartMemorySegTrain>);
391  h->setSeq(it++.seq_);
392 
393  //train body
394  {
395  auto totalLen = (size_t)m.len;
396  char* base = (char*)m.attachment;
397  decltype(totalLen) offset = 0;
398  while(totalLen > offset) {
399  auto s = *it;
400  char* addr = static_cast<char*>(s);
401  auto h = new (addr) TransportMessageHeader;
402  auto& seg = (new (addr + sizeof(TransportMessageHeader)) MessageWrap<MemorySeg>)
403  ->template get<MemorySeg>();
404  seg.seg = base + offset;
405  auto l = (uint16_t)std::min((size_t)maxMemorySegPayloadSize_, totalLen - offset);
406  seg.len = l;
407  seg.inbandUnderlyingTypeTag = tag;
408  h->messagePayloadLen = sizeof(MessageHead) + l; //skipped bytes
409  h->setSeq(it++.seq_);
410  offset += l;
411  }
412  }
413 
414  //actual message
415  {
416  auto s = *it;
417  char* addr = static_cast<char*>(s);
418  auto h = new (addr) TransportMessageHeader;
419  new (addr + sizeof(TransportMessageHeader)) MessageWrap<app::JustBytes>(
420  tag, &m, len, &m);
421  h->messagePayloadLen = sizeof(MessageHead) + len;
422  h->flag = app::hasMemoryAttachment::flag;
423  h->setSeq(it++.seq_);
424  }
425 }
426 
427 inline
428 void
429 SendTransport::
430 stop() {
431  asyncBackupSendServer_.stop();
432  buffer_.reset(0);
433  buffer_.reset(1);
434  stopped_ = true;
435 };
436 
437 inline
438 uint16_t
439 SendTransport::
440 outBufferSizePower2() {
441  auto res = config_.getExt<uint16_t>("outBufferSizePower2");
442  if (res) {
443  return res;
444  }
445  res =hmbdc::numeric::log2Upper(512ul * 1024ul / (8ul + maxMessageSize_));
446  HMBDC_LOG_N("auto set --outBufferSizePower2=", res);
447  return res;
448 }
449 
450 inline
451 void
452 SendTransport::
453 cullSlow() {
454  if (hmbdc_likely(minRecvToStart_ == std::numeric_limits<size_t>::max())) {
455  auto seq = buffer_.readSeq(1);
456  if (seq == lastBackupSeq_ && buffer_.isFull()) {
457  asyncBackupSendServer_.killSlowestSession();
458  }
459  lastBackupSeq_ = seq;
460  }
461 }
462 }// sendtransportengine_detail
463 
464 }}}
465 
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: SendTransportEngine.hpp:303
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:308
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:298
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:292
Definition: BlockingBuffer.hpp:11
Definition: Message.hpp:212
Definition: Message.hpp:405
Definition: Time.hpp:14
void schedule(SysTime fireAt, Timer &timer)
schedule the timer to start at a specific time
Definition: Timers.hpp:79
Definition: TypeTagSet.hpp:141
Definition: Message.hpp:263
Definition: Time.hpp:134
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: TypeTagSet.hpp:201
Definition: Rater.hpp:11
Definition: Message.hpp:418
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Base.hpp:12
Definition: Timers.hpp:117