hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/tips/tcpcast/SendServer.hpp"
5 #include "hmbdc/tips/tcpcast/Transport.hpp"
6 #include "hmbdc/tips/tcpcast/Messages.hpp"
7 #include "hmbdc/tips/udpcast/SendTransportEngine.hpp"
8 #include "hmbdc/tips/tcpcast/DefaultUserConfig.hpp"
9 #include "hmbdc//MetaUtils.hpp"
10 #include "hmbdc/time/Time.hpp"
11 #include "hmbdc/time/Rater.hpp"
12 #include "hmbdc/numeric/BitMath.hpp"
13 
14 #include <boost/circular_buffer.hpp>
15 #include <memory>
16 #include <tuple>
17 #include <regex>
18 #include <type_traits>
19 #include <mutex>
20 
21 namespace hmbdc { namespace tips { namespace tcpcast {
22 
23 namespace send_detail {
24 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
25 using namespace hmbdc::time;
26 using namespace hmbdc::app;
27 using namespace std;
28 using pattern::MonoLockFreeBuffer;
29 /**
30  * @brief capture the transportation mechanism
31  *
32  */
34 : Transport {
35  /**
36  * @brief ctor
37  * @param cfg jason specifing the transport - see example, perf-tcpcast.cpp
38  * @param maxMessageSize max messafe size in bytes to be sent
39  */
40  SendTransport(Config, size_t);
41 
42  size_t subscribingPartyDetectedCount(uint16_t tag) const {
43  return outboundSubscriptions_.check(tag);
44  }
45 
46  size_t bufferedMessageCount() const {
47  return buffer_.remainingSize();
48  }
49 
50  template <MessageC Message>
51  void queue(Message&& msg) {
52  if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag())) {
53  using M = typename std::decay<Message>::type;
54  if constexpr (std::is_base_of<hasMemoryAttachment, M>::value) {
55  msg.release();
56  }
57  return;
58  }
59  auto n = 1;
60  auto it = buffer_.claim(n);
61  queue(it, std::forward<Message>(msg));
62  buffer_.commit(it, n);
63  }
64 
65  template <MessageC Message>
66  bool tryQueue(Message&& msg) {
67  if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag())) {
68  using M = typename std::decay<Message>::type;
69  if constexpr (std::is_base_of<hasMemoryAttachment, M>::value) {
70  msg.release();
71  }
72  return true; //don't try again
73  }
74  auto n = 1;
75  auto it = buffer_.tryClaim(n);
76  if (it) {
77  queue(it, std::forward<Message>(msg));
78  buffer_.commit(it, n);
79  return true;
80  }
81  return false;
82  }
83 
84  template <typename Message, typename ... Args>
85  void queueInPlace(Args&&... args) {
86  static_assert(!std::is_base_of<app::JustBytes, Message>::value
87  , "use queueJustBytes instead");
88  static_assert(std::is_trivially_destructible<Message>::value
89  , "cannot send message with dtor");
90  static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
92  , "hasMemoryAttachment has to the first base for Message");
93  auto s = buffer_.claim();
94  char* addr = static_cast<char*>(*s);
95  auto h = new (addr) TransportMessageHeader;
96  h->messagePayloadLen = sizeof(MessageWrap<Message>);
97  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
98  auto wrap = new (addr + sizeof(TransportMessageHeader))
99  MessageWrap<Message>(std::forward<Args>(args)...);
100  h->flag = wrap->scratchpad().desc.flag;
101  } else {
102  HMBDC_THROW(std::out_of_range
103  , "maxMessageSize too small to hold a message");
104  }
105  buffer_.commit(s);
106  }
107 
108  void queueJustBytes(uint16_t tag, void const* bytes, size_t len
109  , app::hasMemoryAttachment* att) {
110  if (!minRecvToStart_ && !outboundSubscriptions_.check(tag)) {
111  if (att) {
112  att->release();
113  }
114  return;
115  }
116  auto s = buffer_.claim();
117  char* addr = static_cast<char*>(*s);
118  auto h = new (addr) TransportMessageHeader;
119  h->messagePayloadLen = len + sizeof(MessageHead);
120  if (hmbdc_likely(len <= maxMessageSize_)) {
121  auto wrap = new (addr + sizeof(TransportMessageHeader))
122  MessageWrap<app::JustBytes>(tag, bytes, len, att);
123  h->flag = wrap->scratchpad().desc.flag;
124  } else {
125  HMBDC_THROW(std::out_of_range
126  , "maxMessageSize too small to hold a message");
127  }
128  buffer_.commit(s);
129  }
130 
131  void stop();
132 
133 protected:
134  size_t maxMessageSize_;
135  size_t minRecvToStart_;
136  MonoLockFreeBuffer buffer_;
137 
138  unique_ptr<udpcast::SendTransport> mcSendTransport_;
139  Rater rater_;
140  Config mcConfig_;
141  TypeTagSet outboundSubscriptions_;
142 
143 private:
144  template<typename M, typename ... Messages>
145  void queue(MonoLockFreeBuffer::iterator it, M&& m, Messages&&... msgs) {
146  using Message = typename std::decay<M>::type;
147  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
148  static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
150  , "hasMemoryAttachment has to the first base for Message");
151 
152  auto s = *it;
153  char* addr = static_cast<char*>(s);
154  auto h = new (addr) TransportMessageHeader;
155  h->messagePayloadLen = sizeof(MessageWrap<Message>);
156  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
157  auto wrap = new (addr + sizeof(TransportMessageHeader))
158  MessageWrap<Message>(std::forward<M>(m));
159  h->flag = wrap->scratchpad().desc.flag;
160  } else {
161  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
162  }
163  if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
164  h->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
165  }
166  queue(++it, std::forward<Messages>(msgs)...);
167  }
168 
169  void queue(MonoLockFreeBuffer::iterator it) {}
170 };
171 
174 , TimerManager
176 , Client<SendTransportEngine> {
177  SendTransportEngine(Config const&, size_t);
178  using SendTransport::hmbdcName;
179  using SendTransport::schedSpec;
180 
181  template <MessageTupleC Messages, typename Node>
182  void advertiseFor(Node const& node, uint16_t mod, uint16_t res) {
183  std::scoped_lock<std::mutex> g(advertisedTypeTags_.lock);
184  advertisedTypeTags_.addPubsFor<Messages>(node, mod, res);
185  server_->advertisingMessages(advertisedTypeTags_);
186  }
187 
188  void rotate() {
189  this->checkTimers(time::SysTime::now());
191  }
192 
193  /*virtual*/
194  void invokedCb(size_t) HMBDC_RESTRICT override {
195  runOnce();
196  mcSendTransport_->runOnce(true);
197  }
198 
199  /*virtual*/
200  bool droppedCb() override {
201  stop();
202  stopped_ = true;
203  return true;
204  };
205 
206  /**
207  * @brief check how many recipient sessions are still active
208  * @return numeric_limits<size_t>::max() if the sending hasn't started due to
209  * minRecvToStart has not been met yet
210  */
211  size_t sessionsRemainingActive() const {
212  return server_->readySessionCount();
213  }
214 
215 private:
216  void cullSlow() {
217  auto seq = buffer_.readSeq();
218  if (seq == lastSeq_ && buffer_.isFull()) {
219  server_->killSlowestSession();
220  }
221  lastSeq_ = seq;
222  }
223 
224  void runOnce();
225 
226  size_t mtu_;
227  size_t maxSendBatch_;
228  bool waitForSlowReceivers_;
229  MonoLockFreeBuffer::iterator begin_;
230  ToSend toSend_;
231  TypeTagSetST advertisedTypeTags_;
232 
233  std::optional<SendServer> server_;
234  size_t lastSeq_;
235  bool stopped_;
236 };
237 } //send_detail
238 
239 using SendTransport = send_detail::SendTransport;
240 using SendTransportEngine = send_detail::SendTransportEngine;
241 }}}
242 
243 namespace hmbdc { namespace tips { namespace tcpcast {
244 
245 namespace send_detail {
246 using namespace hmbdc::time;
247 using namespace std;
248 using pattern::MonoLockFreeBuffer;
249 
250 inline
253  , size_t maxMessageSize)
254 : Transport((cfg.setAdditionalFallbackConfig(Config(DefaultUserConfig))
255  , cfg.resetSection("tx", false)))
256 , maxMessageSize_(maxMessageSize)
257 , minRecvToStart_(config_.getExt<size_t>("minRecvToStart"))
258 , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(MessageHead)
259  , config_.getExt<uint16_t>("outBufferSizePower2")
260  ?config_.getExt<uint16_t>("outBufferSizePower2")
261  :hmbdc::numeric::log2Upper(128ul * 1024ul / maxMessageSize)
262 )
263 , rater_(Duration::seconds(1u)
264  , config_.getExt<size_t>("sendBytesPerSec")
265  , config_.getExt<size_t>("sendBytesBurst")
266  , config_.getExt<size_t>("sendBytesBurst") != 0ul //no rate control by default
267 )
268 , mcConfig_(config_) {
269  mcConfig_.put("loopback", true); //always allow advertising to loopback, so other process
270  //on the same machine get them
271  mcConfig_.put("outBufferSizePower2", 3u);
272 
273  mcSendTransport_.reset(new udpcast::SendTransport(
274  mcConfig_, sizeof(MessageWrap<TypeTagSource>)));
275 }
276 
277 inline
278 void
279 SendTransport::
280 stop() {
281  buffer_.reset();
282 };
283 
284 inline
285 SendTransportEngine::
286 SendTransportEngine(Config const& cfg
287  , size_t maxMessageSize)
288 : SendTransport(cfg, maxMessageSize)
289 , ReoccuringTimer(Duration::seconds(config_.getExt<size_t>("typeTagAdvertisePeriodSeconds")))
290 , mtu_(config_.getExt<size_t>("mtu") - 20 - 20) //20 bytes ip header and 20 bytes tcp header
291 , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
292 , waitForSlowReceivers_(config_.getExt<bool>("waitForSlowReceivers"))
293 , lastSeq_(0)
294 , stopped_(false) {
295  toSend_.reserve(maxSendBatch_ * 2);
296  // inititialize begin_ to first item
297  MonoLockFreeBuffer::iterator end;
298  buffer_.peek(begin_, end, 0);
299 
300  server_.emplace(config_, buffer_.capacity(), outboundSubscriptions_);
301  setCallback(
302  [this](TimerManager& tm, SysTime const& now) {
303  std::unique_lock<std::mutex> g(advertisedTypeTags_.lock, std::try_to_lock);
304  if (!g.owns_lock()) return;
305  for (auto& ad : server_->advertisingMessages()) {
306  mcSendTransport_->queue(ad);
307  }
308  if (!waitForSlowReceivers_) cullSlow();
309  }
310  );
311 
312  schedule(SysTime::now(), *this);
313 }
314 
315 inline
316 void
317 SendTransportEngine::
318 runOnce() HMBDC_RESTRICT {
319  MonoLockFreeBuffer::iterator begin, end;
320  //only START to send when enough sessions are ready to receive or
321  //buffer is full
322  if (hmbdc_likely(!minRecvToStart_
323  || server_->readySessionCount() >= minRecvToStart_)) {
324  minRecvToStart_ = 0; //now started no going back
325  buffer_.peek(begin, end, maxSendBatch_);
326  } else {
327  buffer_.peek(begin, end, 0);
328  }
329 
330  auto it = begin_;
331  uint16_t currentTypeTag = 0;
332  size_t toSendByteSize = 0;
333 
334  while (it != end) {
335  void* ptr = *it;
336  auto item = static_cast<TransportMessageHeader*>(ptr);
337 
338  if (hmbdc_unlikely(!rater_.check(item->wireSize()))) break;
339  if (hmbdc_unlikely(!item->flag
340  && toSendByteSize + item->wireSize() > mtu_)) break;
341  it++;
342  if (hmbdc_unlikely(!currentTypeTag)) {
343  currentTypeTag = item->typeTag();
344  toSendByteSize = item->wireSize();
345  toSend_.push_back(iovec{ptr, item->wireSize()});
346  } else if (item->typeTag() == currentTypeTag) {
347  toSendByteSize += item->wireSize();
348  toSend_.push_back(iovec{ptr, item->wireSize()});
349  } else {
350  server_->queue(currentTypeTag
351  , move(toSend_)
352  , toSendByteSize
353  , it);
354  toSend_.clear();
355 
356  currentTypeTag = item->typeTag();
357  toSendByteSize = item->wireSize();
358  toSend_.push_back(iovec{ptr, item->wireSize()});
359  }
360 
361  if (hmbdc_unlikely(item->flag == hasMemoryAttachment::flag)) {
362  auto& a = item->wrapped<hasMemoryAttachment>();
363  toSend_.push_back(iovec{a.attachment, a.len});
364  toSendByteSize += a.len;
365  }
366  rater_.commit();
367  }
368 
369  if (toSend_.size()) {
370  server_->queue(currentTypeTag
371  , move(toSend_)
372  , toSendByteSize
373  , it);
374  toSend_.clear();
375  }
376  begin_ = it;
377  //use it here which is the maximum
378  auto newStart = server_->runOnce(begin, it);
379  if (begin != end) { //only need to do this when there r things read from buffer_
380  for (auto it = begin; it != newStart; ++it) {
381  void* ptr = *it;
382  auto item = static_cast<TransportMessageHeader*>(ptr);
383  if (hmbdc_unlikely(item->flag == hasMemoryAttachment::flag)) {
384  auto& a = item->wrapped<hasMemoryAttachment>();
385  a.release();
386  }
387  }
388  buffer_.wasteAfterPeek(begin, newStart - begin, true);
389  }
390 }
391 } //send_detail
392 }}}
393 
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:194
Definition: MonoLockFreeBuffer.hpp:16
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:211
Definition: Base.hpp:12
SendTransport(Config, size_t)
ctor
Definition: SendTransportEngine.hpp:252
class to hold an hmbdc configuration
Definition: Config.hpp:45
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:200
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
Definition: Transport.hpp:65
Definition: SendTransportEngine.hpp:172
Definition: Message.hpp:212
Definition: Time.hpp:14
Definition: TypeTagSet.hpp:141
Definition: Message.hpp:263
Definition: Time.hpp:134
Config & put(Args &&... args)
forward the call to ptree&#39;s put but return Configure
Definition: Config.hpp:195
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
Definition: Rater.hpp:10
capture the transportation mechanism
Definition: SendTransportEngine.hpp:33
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: TypeTagSet.hpp:201
Definition: Rater.hpp:11
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Base.hpp:12
void wasteAfterPeek(iterator, size_t, bool=false)
if size not matching - please refer to the impl for details
Definition: MonoLockFreeBuffer.hpp:160
Definition: Timers.hpp:117
Definition: LockFreeBufferMisc.hpp:89