hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/udpcast/Transport.hpp"
4 #include "hmbdc/tips/udpcast/Messages.hpp"
5 #include "hmbdc/tips/udpcast/DefaultUserConfig.hpp"
6 #include "hmbdc/app/Client.hpp"
7 #include "hmbdc/comm/inet/Endpoint.hpp"
8 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
9 #include "hmbdc/time/Time.hpp"
10 #include "hmbdc/time/Rater.hpp"
11 #include "hmbdc/numeric/BitMath.hpp"
12 
13 #include <regex>
14 #include <memory>
15 #include <iostream>
16 #include <mutex>
17 
18 namespace hmbdc { namespace tips { namespace udpcast {
19 
20 namespace sendtransportengine_detail {
21 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
22 using namespace std;
23 using namespace hmbdc::app;
24 using namespace hmbdc::time;
25 using namespace hmbdc::pattern;
27 
29 : Transport {
30  SendTransport(Config, size_t);
31  ~SendTransport();
32 
33  size_t bufferedMessageCount() const {
34  return buffer_.remainingSize();
35  }
36 
37  size_t subscribingPartyDetectedCount(uint16_t tag) const {
38  return 0; //not supported
39  }
40 
41  template <MessageTupleC Messages, typename Node>
42  void advertiseFor(Node const& node, uint16_t mod, uint16_t res) {
43  //only applies to connection oriented transport
44  }
45 
46  template <MessageC Message>
47  void queue(Message&& msg) {
48  auto n = 1;
49  auto it = buffer_.claim(n);
50  queue(it, std::forward<Message>(msg));
51  buffer_.commit(it, n);
52  }
53 
54  template <typename Message>
55  bool tryQueue(Message&& msg) {
56  auto n = 1;
57  auto it = buffer_.tryClaim(n);
58  if (it) {
59  queue(it, std::forward<Message>(msg));
60  buffer_.commit(it, n);
61  return true;
62  }
63  return false;
64  }
65 
66  // template <typename Message, typename ... Args>
67  // void queueInPlace(Args&&... args) {
68  // static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
69  // auto s = buffer_.claim();
70  // char* addr = static_cast<char*>(*s);
71  // auto h = reinterpret_cast<TransportMessageHeader*>(addr);
72  // if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
73  // new (addr + sizeof(TransportMessageHeader)) MessageWrap<Message>(std::forward<Args>(args)...);
74  // h->messagePayloadLen() = sizeof(MessageWrap<Message>);
75  // } else {
76  // HMBDC_THROW(std::out_of_range
77  // , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
78  // }
79  // buffer_.commit(s);
80  // }
81 
82  void runOnce(bool alwaysPoll = false) HMBDC_RESTRICT {
83  resumeSend();
84  if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
85  utils::EpollTask::instance().poll();
86  }
87  }
88 
89  // template <typename M>
90  // static TransportMessageHeader* encode(Topic const& t, void* buf, size_t bufLen, M const& m) {
91  // using Message = typename std::decay<M>::type;
92  // char* addr = static_cast<char*>(buf);
93  // auto h = reinterpret_cast<TransportMessageHeader*>(addr);
94  // if (hmbdc_likely(sizeof(Message) <= bufLen)) {
95  // new (addr + sizeof(TransportMessageHeader)) MessageWrap<Message>(m);
96  // h->messagePayloadLen() = sizeof(MessageWrap<Message>);
97  // } else {
98  // HMBDC_THROW(std::out_of_range, "bufLen too small to hold a message");
99  // }
100  // return h;
101  // }
102 
103  void stop();
104 
105 private:
106  size_t maxMessageSize_;
107  typename Buffer::iterator begin_, it_, end_;
108  Buffer buffer_;
109  Rater rater_;
110  size_t maxSendBatch_;
111 
112  iovec* toSendMsgs_;
113  size_t toSendMsgsHead_;
114  size_t toSendMsgsTail_;
115  mmsghdr* toSendPkts_;
116  size_t toSendPktsHead_;
117  size_t toSendPktsTail_;
118  std::vector<comm::inet::Endpoint> udpcastDests_;
119 
120  uint16_t
121  outBufferSizePower2();
122 
123  template<typename M, typename ... Messages>
124  void queue(typename Buffer::iterator it, M&& m, Messages&&... msgs) {
125  using Message = typename std::decay<M>::type;
126  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
127  auto s = *it;
128  char* addr = static_cast<char*>(s);
129  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
130  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
131  new (addr + sizeof(TransportMessageHeader)) MessageWrap<Message>(std::forward<M>(m));
132  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
133  } else {
134  HMBDC_THROW(std::out_of_range
135  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
136  }
137  if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
138  h->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
139  }
140  queue(++it, std::forward<Messages>(msgs)...);
141  }
142 
143  void queue(typename Buffer::iterator it) {}
144  void resumeSend();
145 };
146 
149 , Client<SendTransportEngine> {
150  using SendTransport::SendTransport;
151  using SendTransport::hmbdcName;
152  using SendTransport::schedSpec;
153 
154  void rotate() {
156  }
157 
158  /*virtual*/
159  void invokedCb(size_t) HMBDC_RESTRICT override {
160  runOnce();
161  }
162  /*virtual*/ bool droppedCb() override {
163  stop();
164  return true;
165  };
166 
167  using Transport::hmbdcName;
168 };
169 
170 } //sendtransportengine_detail
171 using SendTransport = sendtransportengine_detail::SendTransport;
172 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
173 }}}
174 
175 
176 
177 namespace hmbdc { namespace tips { namespace udpcast {
178 
179 namespace sendtransportengine_detail {
180 using namespace hmbdc::time;
181 
182 using namespace hmbdc::pattern;
183 using namespace std;
184 
185 inline
186 SendTransport::
187 SendTransport(Config cfg
188  , size_t maxMessageSize)
189 : Transport((cfg.setAdditionalFallbackConfig(Config(DefaultUserConfig))
190  , cfg.resetSection("tx", false)))
191 , maxMessageSize_(maxMessageSize)
192 , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(MessageHead), outBufferSizePower2())
193 , rater_(Duration::seconds(1u)
194  , config_.getExt<size_t>("sendBytesPerSec")
195  , config_.getExt<size_t>("sendBytesBurst")
196  , config_.getExt<size_t>("sendBytesBurst") != 0ul) //no rate control by default
197 , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
198 , toSendMsgs_(new iovec[maxSendBatch_])
199 , toSendMsgsHead_(0)
200 , toSendMsgsTail_(0)
201 , toSendPkts_(new mmsghdr[maxSendBatch_])
202 , toSendPktsHead_(0)
203 , toSendPktsTail_(0) {
204  using namespace hmbdc::app;
205  char loopch = config_.getExt<bool>("loopback")?1:0;
206  if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch)) < 0) {
207  HMBDC_THROW(runtime_error, "failed to set loopback=" << config_.getExt<bool>("loopback"));
208  }
209 
210  auto sz = config_.getExt<int>("udpSendBufferBytes");
211  if (sz) {
212  if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)) < 0) {
213  HMBDC_LOG_C("failed to set send buffer size=", sz);
214  }
215  }
216 
217  auto ttl = config_.getExt<int>("ttl");
218  if (ttl > 0 && setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0) {
219  HMBDC_THROW(runtime_error, "failed to set set ttl=" << ttl);
220  }
221 
222  utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, *this);
223  auto minHead = sizeof(MessageHead) + sizeof(TransportMessageHeader);
224  if (maxMessageSize_ + minHead > mtu_) {
225  HMBDC_THROW(std::out_of_range, "maxMessageSize needs <= " << mtu_ - minHead);
226  }
227  cfg(udpcastDests_, "udpcastDests");
228  if (udpcastDests_.size() == 0) {
229  HMBDC_THROW(std::out_of_range, "empty udpcastDests");
230  }
231  memset(toSendMsgs_, 0, sizeof(iovec) * maxSendBatch_);
232  memset(toSendPkts_, 0, sizeof(mmsghdr) * maxSendBatch_);
233  std::for_each(toSendPkts_, toSendPkts_ + maxSendBatch_
234  , [this](mmsghdr& pkt) {
235  pkt.msg_hdr.msg_name = &udpcastDests_.begin()->v;
236  pkt.msg_hdr.msg_namelen = sizeof(udpcastDests_.begin()->v);
237  });
238 }
239 
240 inline
241 SendTransport::
242 ~SendTransport() {
243  delete []toSendPkts_;
244  delete []toSendMsgs_;
245 }
246 
247 inline
248 void
249 SendTransport::
250 stop() {
251  buffer_.reset();
252 }
253 
254 inline
255 uint16_t
256 SendTransport::
257 outBufferSizePower2() {
258  auto res = config_.getExt<uint16_t>("outBufferSizePower2");
259  if (res) {
260  return res;
261  }
262  res =hmbdc::numeric::log2Upper(8ul * 1024ul / (8ul + maxMessageSize_));
263  HMBDC_LOG_N("auto set --outBufferSizePower2=", res);
264  return res;
265 }
266 
267 inline
268 void
269 SendTransport::
270 resumeSend() HMBDC_RESTRICT {
271  bool rateOk = true;
272  while (rateOk) {
273  if (it_ == end_) {
274  buffer_.wasteAfterPeek(begin_, end_ - begin_);
275  buffer_.peek(it_, end_, maxSendBatch_);
276  begin_ = it_;
277  }
278  //make a packet
279  size_t packetBytes = 0;
280  while (it_ != end_ && toSendMsgsTail_ < maxSendBatch_) {
281  void* ptr = *it_;
282  auto item = static_cast<TransportMessageHeader*>(ptr);
283  rateOk = rater_.check(item->wireSize());
284  if (hmbdc_unlikely(!rateOk)) {
285  break;
286  }
287  packetBytes += item->wireSize();
288  if (hmbdc_unlikely(packetBytes > mtu_)) {
289  toSendPkts_[toSendPktsTail_].msg_hdr.msg_iov = toSendMsgs_ + toSendMsgsHead_;
290  toSendPkts_[toSendPktsTail_++].msg_hdr.msg_iovlen
291  = toSendMsgsTail_ - toSendMsgsHead_;
292  toSendMsgsHead_ = toSendMsgsTail_;
293  packetBytes = item->wireSize();
294  }
295 
296  toSendMsgs_[toSendMsgsTail_].iov_base = ptr;
297  toSendMsgs_[toSendMsgsTail_++].iov_len = item->wireSize();
298  rater_.commit();
299  it_++;
300  }
301  if (packetBytes) {
302  //wrap up current packet
303  toSendPkts_[toSendPktsTail_].msg_hdr.msg_iov = toSendMsgs_ + toSendMsgsHead_;
304  toSendPkts_[toSendPktsTail_++].msg_hdr.msg_iovlen
305  = toSendMsgsTail_ - toSendMsgsHead_;
306  toSendMsgsHead_ = toSendMsgsTail_;
307  }
308 
309  auto toSendPktsCount = toSendPktsTail_ - toSendPktsHead_;
310  if (toSendPktsCount && isFdReady()) {
311  int l = 0;
312  l = sendmmsg(fd, toSendPkts_ + toSendPktsHead_, toSendPktsCount, MSG_NOSIGNAL|MSG_DONTWAIT);
313  if (hmbdc_unlikely(udpcastDests_.size() > 1)) {
314  for (auto i = 1u; hmbdc_unlikely(i < udpcastDests_.size()); ++i) {
315  auto& addr = udpcastDests_[i].v;
316  std::for_each(toSendPkts_ + toSendPktsHead_, toSendPkts_ + toSendPktsHead_ + toSendPktsCount
317  , [&addr](mmsghdr& pkt) {
318  pkt.msg_hdr.msg_name = &addr;
319  }
320  );
321  l = std::max(l, sendmmsg(fd, toSendPkts_ + toSendPktsHead_, toSendPktsCount, MSG_NOSIGNAL|MSG_DONTWAIT));
322  }
323  std::for_each(toSendPkts_ + toSendPktsHead_, toSendPkts_ + toSendPktsHead_ + toSendPktsCount
324  , [this](mmsghdr& pkt) {
325  pkt.msg_hdr.msg_name = &udpcastDests_[0].v;
326  }
327  );
328  }
329  if (hmbdc_likely(l > 0)) {
330  } else if (hmbdc_likely(l < 0)) {
331  if (hmbdc_unlikely(!checkErr())) {
332  HMBDC_LOG_C("sendmmsg failed errno=", errno);
333  }
334  return;
335  } else {
336  return; //try again later
337  }
338 
339  toSendPktsHead_ += l;
340  if (toSendPktsHead_ == toSendPktsTail_) {
341  toSendPktsHead_ = toSendPktsTail_ = 0;
342  toSendMsgsHead_ = toSendMsgsTail_ = 0;
343  }
344  } else {
345  //nothing to do now
346  return;
347  }
348  }
349 }
350 } //sendtransportengine_detail
351 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: MonoLockFreeBuffer.hpp:16
Definition: Transport.hpp:39
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:162
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: TypedString.hpp:84
Definition: BlockingBuffer.hpp:11
Definition: Message.hpp:212
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:159
Definition: Message.hpp:263
Definition: Time.hpp:134
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89