hmbdc
simplify-high-performance-messaging-programming
NmSendTransport.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #define NETMAP_WITH_LIBS
5 #include <net/netmap_user.h>
6 #undef NETMAP_WITH_LIBS
7 
8 #include "hmbdc/tips/rnetmap/Transport.hpp"
9 #include "hmbdc/tips/rnetmap/Messages.hpp"
10 #include "hmbdc/tips/reliable/BackupSendServerT.hpp"
11 #include "hmbdc/app/Base.hpp"
12 #include "hmbdc/comm/eth/Misc.h"
13 #include "hmbdc/comm/inet/Misc.hpp"
14 #include "hmbdc/time/Time.hpp"
15 #include "hmbdc/time/Rater.hpp"
16 #include "hmbdc/pattern/LockFreeBufferT.hpp"
17 
18 
19 #include <boost/bind.hpp>
20 #include <memory>
21 
22 #include <iostream>
23 
24 #include <netinet/ether.h> /* ether_aton */
25 #include <linux/if_packet.h> /* sockaddr_ll */
26 #include <sys/sysctl.h> /* sysctl */
27 #include <ifaddrs.h> /* getifaddrs */
28 
29 #include <poll.h>
30 
31 namespace hmbdc { namespace tips { namespace rnetmap {
32 
33 namespace nmsendtransport_detail {
34 using namespace std;
35 using namespace hmbdc::time;
36 
37 using namespace hmbdc::app;
38 using namespace hmbdc::pattern;
39 using namespace hmbdc::comm::eth;
40 using namespace hmbdc::comm::inet;
41 
43 using ToCleanupAttQueue = reliable::ToCleanupAttQueue;
44 
46 : Transport {
47  NmSendTransport(Config const& cfg
48  , size_t maxMessageSize
49  , Buffer& buffer
50  , Rater& rater
51  , ToCleanupAttQueue& toCleanupAttQueue)
52  : Transport(cfg)
53  , nmd_(nullptr)
54  , virtHeader_(0)
55  , srcEthAddr_{{0}}
56  , dstEthAddr_{{0}}
57  , doChecksum_(config_.getExt<bool>("doChecksum"))
58  , maxMessageSize_(maxMessageSize)
59  , buffer_(buffer)
60  , rater_(rater)
61  , toCleanupAttQueue_(toCleanupAttQueue)
62  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
63  , adPending_(false)
64  , seqAlertPending_(false)
65  , seqAlert_(nullptr)
66  , startSending_(false) {
67  if (maxMessageSize_ > mtu_) {
68  HMBDC_THROW(std::out_of_range, "mtu needs to >= " << maxMessageSize_);
69  }
70  if (maxMessageSize_ > TransportMessageHeader::maxPayloadSize()) {
71  HMBDC_THROW(std::out_of_range, "maxMessageSize_ needs to <=" << TransportMessageHeader::maxPayloadSize());
72  }
73 
74  memcpy(&srcEthAddr_, ether_aton(config_.getExt<std::string>("srcEthAddr").c_str())
75  , sizeof(srcEthAddr_));
76  memcpy(&dstEthAddr_, ether_aton(config_.getExt<std::string>("dstEthAddr").c_str())
77  , sizeof(dstEthAddr_));
78  getMacAddresses();
79 
80  struct nmreq baseNmd;
81  bzero(&baseNmd, sizeof(baseNmd));
82  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
83  config_(baseNmd.nr_tx_slots, "nmTxSlots");
84  config_(baseNmd.nr_rx_slots, "nmRxSlots");
85  config_(baseNmd.nr_tx_rings, "nmTxRings");
86  config_(baseNmd.nr_rx_rings, "nmRxRings");
87 
88  auto nmport = cfg.getExt<std::string>("netmapPort");
89  nmd_ = nm_open(nmport.c_str(), &baseNmd
90  , cfg.getExt<uint64_t>("nmOpenFlags"), NULL);
91  if (!nmd_) {
92  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
93  }
94  if (nmd_->first_tx_ring != nmd_->last_tx_ring) {
95  HMBDC_THROW(std::out_of_range
96  , "multiple tx rings exist on " << nmport
97  << ". use more specific netmapPort. for example: netmap::p2p1-2 ");
98  }
99  struct nmreq req;
100  memset(&req, 0, sizeof(req));
101  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
102  req.nr_version = NETMAP_API;
103  req.nr_cmd = NETMAP_VNET_HDR_GET;
104  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
105  if (err) {
106  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
107  }
108  virtHeader_ = req.nr_arg1;
109  auto srcIpStr = config_.getExt<string>("srcIp");
110  if (srcIpStr == "tcpIfaceAddr") {
111  srcIpStr = getLocalIpMatchMask(config_.getExt<string>("tcpIfaceAddr"));
112  }
113  initializePacket(&precalculatedPacketHead_
114  , config_.getExt<uint16_t>("ttl")
115  , srcIpStr
116  , config_.getExt<string>("dstIp")
117  , srcEthAddr_
118  , dstEthAddr_
119  , config_.getExt<uint16_t>("srcPort")
120  , config_.getExt<uint16_t>("dstPort")
121  );
122  sleep(config_.getExt<int>("nmResetWaitSec"));
123  //cleanup rings
124  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
125  HMBDC_THROW(std::runtime_error, "IO error");
126  }
127  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, nmd_->first_tx_ring);
128  txring->head = txring->cur = txring->tail;
129 
130  auto addr = seqAlertBuf_;
131  auto h = new (addr) TransportMessageHeader;
132  new (addr + sizeof(TransportMessageHeader)) MessageWrap<SeqAlert>();
133  h->messagePayloadLen = sizeof(MessageWrap<SeqAlert>);
134  h->setSeq(std::numeric_limits<HMBDC_SEQ_TYPE>::max());
135  seqAlert_ = &(h->wrapped<SeqAlert>());
136  }
137 
138  ~NmSendTransport(){
139  nm_close(nmd_);
140  }
141 
142  void startSend(){
143  startSending_ = true;
144  }
145 
146  template <typename AdvertisingMessages>
147  void setAds(AdvertisingMessages const& ads) {
148  decltype(adBufs_) newAdBufs;
149  for (auto const& ad : ads) {
150  newAdBufs.emplace_back();
151  auto addr = newAdBufs.rbegin()->data();
152  auto h = new (addr) TransportMessageHeader;
153  new (addr + sizeof(TransportMessageHeader))
155  h->messagePayloadLen = sizeof(MessageWrap<TypeTagBackupSource>);
156  h->setSeq(std::numeric_limits<HMBDC_SEQ_TYPE>::max());
157  }
158  std::swap(adBufs_, newAdBufs);
159  }
160 
161  void setAdPending() {
162  adPending_ = true;
163  }
164 
165  void setSeqAlertPending() {
166  seqAlertPending_ = true;
167  }
168 
169  void runOnce(size_t sessionCount) HMBDC_RESTRICT {
170  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, nmd_->first_tx_ring);
171  if (!nm_ring_empty(txring)) {
172  sendPackets(txring, sessionCount);
173  }
174  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
175  HMBDC_THROW(std::runtime_error, "IO error");
176  }
177  }
178 
179 private:
180  struct nm_desc *nmd_;
181  int virtHeader_; //v hdr len
182  ether_addr srcEthAddr_;
183  ether_addr dstEthAddr_;
184  pkt precalculatedPacketHead_;
185  bool doChecksum_;
186 
188  size_t maxMessageSize_;
189 
190  Buffer& buffer_;
191  Rater& HMBDC_RESTRICT rater_;
192  ToCleanupAttQueue& HMBDC_RESTRICT toCleanupAttQueue_;
193  size_t maxSendBatch_;
194  std::vector<std::array<char, sizeof(TransportMessageHeader)
195  + sizeof(MessageWrap<TypeTagBackupSource>)>> adBufs_;
196  bool adPending_;
197  char seqAlertBuf_[sizeof(TransportMessageHeader) + sizeof(MessageWrap<SeqAlert>)];
198  bool seqAlertPending_;
199  SeqAlert* seqAlert_;
200  bool startSending_;
201 
202  void sendPackets(struct netmap_ring * HMBDC_RESTRICT ring, size_t sessionCount) HMBDC_RESTRICT {
203  uint32_t cur = ring->cur;
204  size_t slotRemaining = 0;
205  bool slotNotInited = true;
206  auto batch = maxSendBatch_;
207  pkt* currentPktPtr = nullptr;
208  size_t slotWireSize = 0;
209  struct netmap_slot *slot = &ring->slot[cur];
210  typename Buffer::iterator begin, end;
211  auto limit = maxSendBatch_ * ((ring->tail - ring->cur) % ring->num_slots);
212  buffer_.peek(0, begin, end, limit);
213  if (hmbdc_unlikely(startSending_ && !sessionCount)) {
214  buffer_.wasteAfterPeek(0u, end - begin);
215  begin = end;
216  }
217  auto it = begin;
218  for(;;) {
219  bool processBuffer = false;
220  TransportMessageHeader* th = nullptr;
221  size_t wireSize = 0;
222  if (hmbdc_unlikely(adPending_)) {
223  for (auto& adBuf : adBufs_) {
224  th = reinterpret_cast<TransportMessageHeader*>(adBuf.data());
225  wireSize += sizeof(adBuf);
226  }
227  adPending_ = false;
228  } else if (it != end && startSending_) {
229  th = reinterpret_cast<TransportMessageHeader*>(*it);
230  wireSize = th->wireSize();
231  processBuffer = true;
232  } else if (hmbdc_unlikely(seqAlertPending_)) {
233  if (begin == end) {
234  th = reinterpret_cast<TransportMessageHeader*>(seqAlertBuf_);
235  wireSize = sizeof(seqAlertBuf_);
236  }
237  seqAlertPending_ = false;
238  }
239  //else wireSize == 0
240  bool raterOk = wireSize && (!processBuffer || rater_.check(wireSize));
241  if (!raterOk) break;
242  char *p = NETMAP_BUF(ring, slot->buf_idx);
243  if (slotNotInited) {
244  slotRemaining = min((size_t)ring->nr_buf_size, mtu_);
245  auto headWireSize = (uint16_t)(
246  sizeof(ether_header) + sizeof(::ip) + sizeof(udphdr) + virtHeader_
247  );
248  memcpy(p, ((char*)&precalculatedPacketHead_) + sizeof(virt_header) - virtHeader_
249  , headWireSize);
250  currentPktPtr = (pkt*)(p + virtHeader_ - sizeof(virt_header));
251  slotRemaining -= headWireSize;
252  slotWireSize = headWireSize;
253  slotNotInited = false;
254  }
255  if (slotRemaining >= wireSize) {
256  if (hmbdc_likely(processBuffer)) {
257  if (hmbdc_unlikely(th->typeTag() == MemorySeg::typeTag)) {
258  auto l = (int)(wireSize - th->wireSizeMemorySeg());
259  memcpy(p + slotWireSize, th->wireBytes(), l);
260  memcpy(p + slotWireSize + l, th->wireBytesMemorySeg(), th->wireSizeMemorySeg());
261  } else {
262  if (hmbdc_unlikely(th->typeTag() == StartMemorySegTrain::typeTag)) {
263  auto& trainHead = th->template wrapped<StartMemorySegTrain>();
264  auto itActual = it; itActual.seq_ += trainHead.segCount + 1;
265  auto actual = static_cast<TransportMessageHeader*>(*itActual);
266  toCleanupAttQueue_.push_back(make_tuple(itActual.seq_
267  , &actual->template wrapped<hasMemoryAttachment>()
268  , trainHead.att.afterConsumedCleanupFunc));
269  }
270  memcpy(p + slotWireSize, th, (int)wireSize);
271  }
272 
273  seqAlert_->expectSeq = it.seq_ + 1;
274  ++it;
275  rater_.commit();
276  } else {
277  memcpy(p + slotWireSize, th, (int)wireSize);
278  }
279  slotRemaining -= wireSize;
280  slotWireSize += wireSize;
281  batch--;
282  } else {
283  batch = 0; //this batch is done
284  }
285  if (!batch || it == end) {
286  size_t wireSizeExcludingHead = slotWireSize
287  - (sizeof(ether_header) + sizeof(::ip) + sizeof(udphdr) + virtHeader_);
288  updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
289  slot->len = slotWireSize;
290  cur = nm_ring_next(ring, cur);
291  slot = &ring->slot[cur];
292  slotWireSize = 0;
293  if (cur == ring->tail || !raterOk) break;
294  //new slot now
295  batch = maxSendBatch_;
296  slotNotInited = true;
297  }
298  if (it == end) break;
299  }
300 
301  if (slotWireSize) {
302  size_t wireSizeExcludingHead = slotWireSize
303  - (sizeof(ether_header) + sizeof(::ip) + sizeof(udphdr) + virtHeader_);
304  updatePacket(currentPktPtr, wireSizeExcludingHead, doChecksum_);
305  slot->len = slotWireSize;
306  cur = nm_ring_next(ring, cur);
307  }
308 
309  ring->head = ring->cur = cur;
310  buffer_.wasteAfterPeek(0u, it - begin);
311  }
312 
313  void getMacAddresses() {
314  auto nmport = config_.getExt<std::string>("netmapPort");
315 
316  if (strncmp(nmport.c_str(), "vale", 4) == 0) return;
317 
318  if (nmport.find_first_of(":") == std::string::npos) {
319  HMBDC_THROW(std::runtime_error
320  , "wrong netmapPort format " << nmport << " (examples: netmap:eth0, netmap:eth0-0)");
321  }
322  auto iface = nmport.substr(nmport.find_first_of(":"));
323  iface = iface.substr(1, iface.find_first_of("-^") - 1);
324 
325 
326  struct ifaddrs *ifaphead, *ifap;
327  int l = sizeof(ifap->ifa_name);
328 
329  if (getifaddrs(&ifaphead) != 0) {
330  HMBDC_THROW(std::runtime_error, "getifaddrs failed for" << iface);
331  }
332  for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
333  struct sockaddr_ll *sll =
334  (struct sockaddr_ll *)ifap->ifa_addr;
335  uint8_t *mac;
336 
337  if (!sll || sll->sll_family != AF_PACKET)
338  continue;
339  if (strncmp(ifap->ifa_name, iface.c_str(), l) != 0)
340  continue;
341  mac = (uint8_t *)(sll->sll_addr);
342 
343  char srcEthAddrStr[20];
344  sprintf(srcEthAddrStr, "%02x:%02x:%02x:%02x:%02x:%02x",
345  mac[0], mac[1], mac[2],
346  mac[3], mac[4], mac[5]);
347  memcpy(&srcEthAddr_, ether_aton(srcEthAddrStr), sizeof(srcEthAddr_)); //6 bytes
348  break;
349  }
350  freeifaddrs(ifaphead);
351  if (!ifap) {
352  HMBDC_THROW(std::runtime_error, "no local interface named " << iface);
353  }
354  }
355  static
356  void initializePacket(struct pkt *pkt, int ttl, std::string srcIpStr, std::string dstIpStr
357  , ether_addr srcEthAddr, ether_addr dstEthAddr, uint16_t srcPort, uint16_t dstPort) {
358  struct ether_header *eh;
359  struct ip *ip;
360  struct udphdr *udp;
361  uint32_t a, b, c, d;
362  sscanf(srcIpStr.c_str(), "%d.%d.%d.%d", &a, &b, &c, &d);
363  auto srcIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
364  sscanf(dstIpStr.c_str(), "%d.%d.%d.%d", &a, &b, &c, &d);
365  auto dstIp = (a << 24u) + (b << 16u) + (c << 8u) + d;
366 
367  /* prepare the headers */
368  eh = &pkt->eh;
369  bcopy(&srcEthAddr, eh->ether_shost, 6);
370  bcopy(&dstEthAddr, eh->ether_dhost, 6);
371 
372  eh->ether_type = htons(ETHERTYPE_IP);
373 
374 #pragma GCC diagnostic push
375 #if defined __clang__ || __GNUC_PREREQ(9,0)
376 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
377 #endif
378  ip = &pkt->ipv4.ip;
379  udp = &pkt->ipv4.udp;
380  ip->ip_v = IPVERSION;
381  ip->ip_hl = sizeof(*ip) >> 2;
382  ip->ip_id = 0;
383  ip->ip_tos = IPTOS_LOWDELAY;
384  ip->ip_len = 0; //zero so chksum can happen in ip_sum
385  ip->ip_id = 0;
386  ip->ip_off = htons(IP_DF); /* Don't fragment */
387  ip->ip_ttl = ttl;
388  ip->ip_p = IPPROTO_UDP;
389  ip->ip_dst.s_addr = htonl(dstIp);
390  ip->ip_src.s_addr = htonl(srcIp);
391  ip->ip_sum = 0;
392  ip->ip_len = sizeof(*ip) + sizeof(udphdr); //ip->ip_len is unknown, put known part
393  udp->source = htons(srcPort);
394  udp->dest = htons(dstPort);
395  udp->len = sizeof(udphdr); //put known part
396  udp->check = 0;
397 
398  bzero(&pkt->vh, sizeof(pkt->vh));
399  }
400 
401  static
402  void updatePacket(struct pkt *packet, size_t payloadWireSize, bool doChecksum = true) {
403  packet->ipv4.ip.ip_len += payloadWireSize; //already has sizeof(ip) + sizeof(udphdr);
404  packet->ipv4.ip.ip_len = ntohs(packet->ipv4.ip.ip_len);
405  if (doChecksum) {
406  packet->ipv4.ip.ip_sum = wrapsum(checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0));
407  }
408 
409  packet->ipv4.udp.len += payloadWireSize;
410  packet->ipv4.udp.len = htons(packet->ipv4.udp.len);
411  if (doChecksum) {
412  auto udp = &packet->ipv4.udp;
413  packet->ipv4.udp.check = wrapsum(
414  checksum(udp, sizeof(*udp), /* udp header */
415  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
416  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
417  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))));
418  }
419  }
420 };
421 
422 #pragma GCC diagnostic pop
423 } //nmsendtransport_detail
425 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: Messages.hpp:185
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: Misc.h:9
Definition: TypedString.hpp:84
Definition: Misc.h:55
Definition: Misc.h:51
Definition: BlockingBuffer.hpp:11
Definition: Message.hpp:263
Definition: Rater.hpp:10
Definition: Endpoint.hpp:16
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89