hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
8 
9 #include "hmbdc/tips/netmap/Messages.hpp"
10 #include "hmbdc/tips/netmap/DefaultUserConfig.hpp"
11 #include "hmbdc/app/Base.hpp"
12 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
13 #include "hmbdc/comm/eth/Misc.h"
14 #include "hmbdc/time/Rater.hpp"
15 #include "hmbdc/numeric/BitMath.hpp"
16 
17 #include <memory>
18 #include <type_traits>
19 #include <mutex>
20 
21 
22 #include <netinet/ether.h> /* ether_aton */
23 #include <linux/if_packet.h> /* sockaddr_ll */
24 #include <sys/sysctl.h> /* sysctl */
25 #include <ifaddrs.h> /* getifaddrs */
26 
27 #include <poll.h>
28 
29 namespace hmbdc { namespace tips { namespace netmap {
30 
31 struct NetContext;
32 struct Sender;
33 
34 namespace sendtransportengine_detail {
35 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
36 using namespace std;
37 using namespace hmbdc::app;
38 using namespace hmbdc::time;
39 using namespace hmbdc::comm::eth;
40 
41 /**
42  * @brief power a netmap port sending functions
43  * @details this needs to be created using NetContext and start in an tips::Context
44  *
45  */
47 : Client<SendTransportEngine> {
48  SendTransportEngine(Config, size_t);
49  size_t bufferedMessageCount() const {
50  return buffer_.remainingSize();
51  }
52 
53  size_t subscribingPartyDetectedCount(uint16_t tag) const {
54  return 0; //not supported
55  }
56 
57  template <MessageTupleC Messages, typename Node>
58  void advertiseFor(Node const& node, uint16_t mod, uint16_t res) {
59  //only applies to connection oriented transport
60  }
61 
62 /**
63  * @brief if the user choose no to have a Context to manage and run the engine
64  * this method can be called from any thread from time to time to have the engine
65  * do its job
66  * @details thread safe and non-blocking, if the engine is already being powered by a Context,
67  * this method has no effect
68  */
69  void rotate() {
71  }
72 
73  void stop();
74 
76  bool droppedCb() override;
77 
78  /*virtual*/
79  void invokedCb(size_t) HMBDC_RESTRICT override {
80  for (int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
81  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
82  if (nm_ring_empty(txring))
83  continue;
84 
85  sendPackets(txring);
86  }
87  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
88  HMBDC_THROW(std::runtime_error, "IO error");
89  }
90  }
91 
92  void stoppedCb(std::exception const& e) override;
93 
94  char const* hmbdcName() const {
95  return this->hmbdcName_.c_str();
96  }
97 
98  std::tuple<char const*, int> schedSpec() const {
99  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
100  }
101 
102  template <typename Message>
103  void queue(Message&& msg) HMBDC_RESTRICT {
104  auto n = 1;
105  auto it = buffer_.claim(n);
106  queue(it, std::forward<Message>(msg));
107  buffer_.commit(it, n);
108  }
109 
110  template <typename Message>
111  bool tryQueue(Message&& msg) HMBDC_RESTRICT {
112  auto n = 1;
113  auto it = buffer_.tryClaim(n);
114  if (it) {
115  queue(it, std::forward<Message>(msg));
116  buffer_.commit(it, n);
117  return true;
118  }
119  return false;
120  }
121 
122  template <typename M, typename... Messages>
123  void queue(pattern::MonoLockFreeBuffer::iterator it, M&& msg, Messages&&... msgs) {
124  using Message = typename std::decay<M>::type;
125  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
126  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_)) {
127  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
128  }
129  auto s = *it;
130  auto tmh = TransportMessageHeader::copyTo(s, std::forward<M>(msg));
131 
132  if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
133  tmh->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
134  } else (void)tmh;
135  queue(++it, std::forward<Messages>(msgs)...);
136  }
137 
138  template <typename Message, typename ... Args>
139  void queueInPlace(Args&&... args) HMBDC_RESTRICT {
140  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
141  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_)) {
142  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
143  }
144  auto s = buffer_.claim();
145  TransportMessageHeader::copyToInPlace<Message>(*s, std::forward<Args>(args)...);
146  buffer_.commit(s);
147  }
148 
149  // void queueBytes(uint16_t tag, void const* bytes, size_t len);
150  void queue(pattern::MonoLockFreeBuffer::iterator it) {}
151 
152 private:
153  uint16_t outBufferSizePower2();
154  void sendPackets(struct netmap_ring *);
155 
156  void getMacAddresses();
157  static
158  void initializePacket(struct pkt *, int, std::string, std::string, ether_addr, ether_addr, uint16_t, uint16_t);
159 
160  static
161  void updatePacket(struct pkt *, size_t, bool = true);
162 
163  Config const config_;
164  string hmbdcName_;
165  string schedPolicy_;
166  int schedPriority_;
167  size_t maxMessageSize_;
168 
169 
170  struct nm_desc *nmd_;
171  pattern::MonoLockFreeBuffer buffer_;
172  int virtHeader_; //v hdr len
173 
174 
175  ether_addr srcEthAddr_;
176  ether_addr dstEthAddr_;
177  pkt precalculatedPacketHead_;
178  bool doChecksum_;
179  Rater rater_;
180  size_t maxSendBatch_;
181  uint16_t mtu_;
182 };
183 
184 } //sendtransportengine_detail
185 
186 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
187 
188 }}}
189 
190 namespace hmbdc { namespace tips { namespace netmap {
191 
192 namespace sendtransportengine_detail {
193 using namespace std;
194 using namespace hmbdc::app;
195 using namespace hmbdc::time;
196 using namespace hmbdc::comm::eth;
197 
198 inline
199 uint16_t
200 SendTransportEngine::
201 outBufferSizePower2() {
202  auto res = config_.getExt<uint16_t>("outBufferSizePower2");
203  if (res) {
204  return res;
205  }
206  res =hmbdc::numeric::log2Upper(16ul * 1024ul / (8ul + maxMessageSize_));
207  HMBDC_LOG_N("auto set --outBufferSizePower2=", res);
208  return res;
209 }
210 
211 inline
212 SendTransportEngine::
213 SendTransportEngine(Config cfg, size_t maxMessageSize)
214 : config_((cfg.setAdditionalFallbackConfig(Config(DefaultUserConfig))
215  , cfg.resetSection("tx", false)))
216 , maxMessageSize_(maxMessageSize)
217 , nmd_(nullptr)
218 , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(MessageHead)
219  , outBufferSizePower2())
220 , virtHeader_(0)
221 , srcEthAddr_{{0}}
222 , dstEthAddr_{{0}}
223 , doChecksum_(config_.getExt<bool>("doChecksum"))
224 , rater_(Duration::seconds(1u)
225  , config_.getExt<size_t>("sendBytesPerSec")
226  , config_.getExt<size_t>("sendBytesBurst")
227  , config_.getExt<size_t>("sendBytesBurst") != 0ul //no rate control by default
228 )
229 , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
230 , mtu_(config_.getExt<size_t>("mtu")) {
231  mtu_ -= (8u + 20u); // 8bytes udp header and 20bytes ip header
232  config_(hmbdcName_, "hmbdcName")
233  (schedPolicy_, "schedPolicy")
234  (schedPriority_, "schedPriority")
235  ;
236 
237  memcpy(&srcEthAddr_, ether_aton(config_.getExt<std::string>("srcEthAddr").c_str())
238  , sizeof(srcEthAddr_));
239  memcpy(&dstEthAddr_, ether_aton(config_.getExt<std::string>("dstEthAddr").c_str())
240  , sizeof(dstEthAddr_));
241  getMacAddresses();
242 
243  struct nmreq baseNmd;
244  bzero(&baseNmd, sizeof(baseNmd));
245  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
246  config_(baseNmd.nr_tx_slots, "nmTxSlots");
247  config_(baseNmd.nr_rx_slots, "nmRxSlots");
248  config_(baseNmd.nr_tx_rings, "nmTxRings");
249  config_(baseNmd.nr_rx_rings, "nmRxRings");
250 
251  auto nmport = config_.getExt<std::string>("netmapPort");
252  uint32_t flags = config_.getExt<uint32_t>("nmOpenFlags");
253  nmd_ = nm_open(nmport.c_str(), &baseNmd, flags, NULL);
254  if (!nmd_) {
255  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
256  }
257 
258  if (nmd_->first_tx_ring != nmd_->last_tx_ring) {
259  HMBDC_LOG_W("multiple tx rings exist on ", nmport, " the recv side could receive out of order messages."
260  " to avoid it, use more specific netmapPort with the ring number. for example: netmap::p2p1-2");
261  }
262 
263  struct nmreq req;
264  memset(&req, 0, sizeof(req));
265  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
266  req.nr_version = NETMAP_API;
267  req.nr_cmd = NETMAP_VNET_HDR_GET;
268  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
269  if (err) {
270  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
271  }
272  virtHeader_ = req.nr_arg1;
273 
274  initializePacket(&precalculatedPacketHead_
275  , config_.getExt<uint16_t>("ttl")
276  , config_.getExt<string>("srcIp")
277  , config_.getExt<string>("dstIp")
278  , srcEthAddr_
279  , dstEthAddr_
280  , config_.getExt<uint16_t>("srcPort")
281  , config_.getExt<uint16_t>("dstPort")
282  );
283  sleep(config_.getExt<int>("nmResetWaitSec"));
284  //cleanup rings
285  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
286  HMBDC_THROW(std::runtime_error, "IO error");
287  }
288  for (int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
289  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
290  txring->head = txring->cur = txring->tail;
291  }
292 }
293 
294 inline
295 void
296 SendTransportEngine::
297 stop() {
298  buffer_.reset();
299 };
300 
301 inline
302 SendTransportEngine::
303 ~SendTransportEngine() {
304  nm_close(nmd_);
305 }
306 
307 /*virtual*/
308 inline
309 bool
312  buffer_.reset();
313  return true;
314 }
315 
316 inline
317 void
319 stoppedCb(std::exception const& e) {
320  HMBDC_LOG_C(e.what());
321 };
322 
323 inline
324 void
325 SendTransportEngine::
326 sendPackets(struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
327  uint32_t cur = ring->cur;
328  if (hmbdc_unlikely(cur == ring->tail)) return;
330  auto limit = maxSendBatch_ * ((ring->tail - cur) % ring->num_slots);
331  if (hmbdc_unlikely(!(buffer_.peek(begin, end, limit)))) {
332  return;
333  }
334  bool slotInited = false;
335  auto it = begin;
336  auto batch = maxSendBatch_;
337  struct netmap_slot *slot = &ring->slot[cur];
338  uint32_t slotLen = 0;
339  char *p = NETMAP_BUF(ring, slot->buf_idx);
340  pkt* currentPktPtr = (pkt*)(p + virtHeader_ - sizeof(virt_header));
341  uint16_t slotLenMax = min(mtu_, (uint16_t)ring->nr_buf_size);
342  for (; it != end;) {
343  auto th = reinterpret_cast<TransportMessageHeader*>(*it);
344  if (hmbdc_likely(rater_.check(th->wireSize()))) {
345  if (!slotInited) {
346  auto wireSize = (uint16_t)(
347  sizeof(ether_header) + sizeof(ip) + sizeof(udphdr) + virtHeader_
348  );
349  memcpy(p, ((char*)&precalculatedPacketHead_) + sizeof(virt_header) - virtHeader_
350  , wireSize);
351  slotLen = wireSize;
352  slotInited = true;
353  }
354  auto wireSize = th->wireSize();
355  if (slotLen + wireSize <= slotLenMax) {
356  memcpy(p + slotLen, th, (int)wireSize);
357  slotLen += wireSize;
358  rater_.commit();
359  batch--;
360  ++it;
361  } else {
362  batch = 0; //this batch is done
363  }
364  if (!batch) {
365  slot->len = slotLen;
366  size_t wireSizeExcludingHead = slotLen
367  - (sizeof(ether_header) + sizeof(ip) + sizeof(udphdr) + virtHeader_);
368  updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
369  cur = nm_ring_next(ring, cur);
370  slotLen = 0;
371  if (cur == ring->tail) break;
372  slot = &ring->slot[cur];
373  p = NETMAP_BUF(ring, slot->buf_idx);
374  currentPktPtr = (pkt*)(p + virtHeader_ - sizeof(virt_header));
375  batch = maxSendBatch_;
376  slotInited = false;
377  }
378  } else {
379  break;
380  }
381  }
382  if (slotLen) {
383  slot->len = slotLen;
384  size_t wireSizeExcludingHead = slotLen
385  - (sizeof(ether_header) + sizeof(ip) + sizeof(udphdr) + virtHeader_);
386  updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
387  cur = nm_ring_next(ring, cur);
388  }
389 
390  ring->head = ring->cur = cur;
391  buffer_.wasteAfterPeek(begin, it - begin, true);
392 }
393 
394 inline
395 void
396 SendTransportEngine::
397 getMacAddresses() {
398  auto nmport = config_.getExt<std::string>("netmapPort");
399 
400  if (strncmp(nmport.c_str(), "vale", 4) == 0) return;
401 
402  if (nmport.find_first_of(":") == std::string::npos) {
403  HMBDC_THROW(std::runtime_error
404  , "wrong netmapPort format (examples: netmap:eth0, netmap:eth0-0)");
405  }
406  auto iface = nmport.substr(nmport.find_first_of(":"));
407  iface = iface.substr(1, iface.find_first_of("-^") - 1);
408 
409 
410  struct ifaddrs *ifaphead, *ifap;
411  int l = sizeof(ifap->ifa_name);
412 
413  if (getifaddrs(&ifaphead) != 0) {
414  HMBDC_THROW(std::runtime_error, "getifaddrs failed for" << iface);
415  }
416  for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
417  struct sockaddr_ll *sll =
418  (struct sockaddr_ll *)ifap->ifa_addr;
419  uint8_t *mac;
420 
421  if (!sll || sll->sll_family != AF_PACKET)
422  continue;
423  if (strncmp(ifap->ifa_name, iface.c_str(), l) != 0)
424  continue;
425  mac = (uint8_t *)(sll->sll_addr);
426 
427  char srcEthAddrStr[20];
428  sprintf(srcEthAddrStr, "%02x:%02x:%02x:%02x:%02x:%02x",
429  mac[0], mac[1], mac[2],
430  mac[3], mac[4], mac[5]);
431  memcpy(&srcEthAddr_, ether_aton(srcEthAddrStr), sizeof(srcEthAddr_)); //6 bytes
432  break;
433  }
434  freeifaddrs(ifaphead);
435  if (!ifap) {
436  HMBDC_THROW(std::runtime_error, "no local interface named " << iface);
437  }
438 }
439 
440 inline
441 void
442 SendTransportEngine::
443 initializePacket(struct pkt *pkt, int ttl, std::string srcIpStr, std::string dstIpStr
444  , ether_addr srcEthAddr, ether_addr dstEthAddr, uint16_t srcPort, uint16_t dstPort) {
445  struct ether_header *eh;
446  struct ip *ip;
447  struct udphdr *udp;
448  uint32_t a, b, c, d;
449  sscanf(srcIpStr.c_str(), "%d.%d.%d.%d", &a, &b, &c, &d);
450  auto srcIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
451  sscanf(dstIpStr.c_str(), "%d.%d.%d.%d", &a, &b, &c, &d);
452  auto dstIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
453 
454  /* prepare the headers */
455  eh = &pkt->eh;
456  bcopy(&srcEthAddr, eh->ether_shost, 6);
457  bcopy(&dstEthAddr, eh->ether_dhost, 6);
458 
459  eh->ether_type = htons(ETHERTYPE_IP);
460 
461 #pragma GCC diagnostic push
462 #if defined __clang__ || __GNUC_PREREQ(9,0)
463 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
464 #endif
465  ip = &pkt->ipv4.ip;
466  udp = &pkt->ipv4.udp;
467  ip->ip_v = IPVERSION;
468  ip->ip_hl = sizeof(*ip) >> 2;
469  ip->ip_id = 0;
470  ip->ip_tos = IPTOS_LOWDELAY;
471  ip->ip_len = 0; //zero so chksum can happen in ip_sum
472  ip->ip_id = 0;
473  ip->ip_off = htons(IP_DF); /* Don't fragment */
474  ip->ip_ttl = ttl;
475  ip->ip_p = IPPROTO_UDP;
476  ip->ip_dst.s_addr = htonl(dstIp);
477  ip->ip_src.s_addr = htonl(srcIp);
478  ip->ip_sum = 0;
479  ip->ip_len = sizeof(*ip) + sizeof(udphdr); //ip->ip_len is unknown, put known part
480  udp->source = htons(srcPort);
481  udp->dest = htons(dstPort);
482  udp->len = sizeof(udphdr); //put known part
483  udp->check = 0;
484 
485  bzero(&pkt->vh, sizeof(pkt->vh));
486 }
487 
488 inline
489 void
490 SendTransportEngine::
491 updatePacket(struct pkt *packet, size_t payloadWireSize, bool doChecksum) {
492  packet->ipv4.ip.ip_len += payloadWireSize; //already has sizeof(ip) + sizeof(udphdr);
493  packet->ipv4.ip.ip_len = ntohs(packet->ipv4.ip.ip_len);
494  if (doChecksum) {
495  packet->ipv4.ip.ip_sum = wrapsum(checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0));
496  }
497 
498  packet->ipv4.udp.len += payloadWireSize;
499  packet->ipv4.udp.len = htons(packet->ipv4.udp.len);
500  if (doChecksum) {
501  auto udp = &packet->ipv4.udp;
502  packet->ipv4.udp.check = wrapsum(
503  checksum(udp, sizeof(*udp), /* udp header */
504  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
505  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
506  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))));
507  }
508 }
509 
510 #pragma GCC diagnostic pop
511 
512 } //sendtransportengine_detail
513 
514 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:79
power a netmap port sending functions
Definition: SendTransportEngine.hpp:46
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: SendTransportEngine.hpp:319
Definition: Base.hpp:12
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:69
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: Misc.h:9
Definition: TypedString.hpp:84
Definition: Misc.h:55
Definition: Misc.h:51
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:311
Definition: Message.hpp:212
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89