hmbdc
simplify-high-performance-messaging-programming
NmRecvTransport.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/rnetmap/Transport.hpp"
4 #include "hmbdc/app/Logger.hpp"
5 #include "hmbdc/comm/inet/Misc.hpp"
6 #include "hmbdc/MetaUtils.hpp"
7 
8 #include <boost/bind.hpp>
9 #include <boost/lexical_cast.hpp>
10 
11 #include <memory>
12 #include <type_traits>
13 
14 namespace hmbdc { namespace tips { namespace rnetmap {
15 
16 namespace nmrecvtransport_detail {
17 using namespace std;
18 using namespace hmbdc::app;
19 using namespace hmbdc::time;
20 using namespace hmbdc::comm::eth;
21 using namespace hmbdc::comm::inet;
22 
23 /**
24  * @brief impl class
25  *
26  * @tparam OutputBuffer type of buffer to hold resulting network messages
27  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
28  * between different recv transport. By default, keeping all
29  */
30 template <typename OutputBuffer, typename Ep2SessionDict>
32 : Transport {
33  using SELF = NmRecvTransport;
34 
35  NmRecvTransport(Config const& cfg
37  , TypeTagSet const& subscriptions
38  , Ep2SessionDict& sessionDict)
39  : Transport(cfg)
40  , doChecksum_(config_.getExt<bool>("doChecksum"))
41  , busyWait_(true)
42  , pollWaitTimeMillisec_(0)
43  , data_(nullptr)
44  , cmdBuffer_(cmdBuffer)
45  , bufCur_(nullptr)
46  , subscriptions_(subscriptions)
47  , sessionDict_(sessionDict) {
48  auto nmport = config_.getExt<std::string>("netmapPort");
49  busyWait_ = config_.getExt<bool>("busyWait");
50  if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<int>("pollWaitTimeMillisec");
51 
52  struct nmreq baseNmd;
53  bzero(&baseNmd, sizeof(baseNmd));
54  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
55  config_(baseNmd.nr_tx_slots, "nmTxSlots");
56  config_(baseNmd.nr_rx_slots, "nmRxSlots");
57  config_(baseNmd.nr_tx_rings, "nmTxRings");
58  config_(baseNmd.nr_rx_rings, "nmRxRings");
59 
60  nmd_ = nm_open(nmport.c_str(), &baseNmd, config_.getExt<int>("nmOpenFlags"), NULL);
61  if (!nmd_) {
62  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
63  }
64 
65  struct nmreq req;
66  memset(&req, 0, sizeof(req));
67  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
68  req.nr_version = NETMAP_API;
69  req.nr_cmd = NETMAP_VNET_HDR_GET;
70  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
71  if (err) {
72  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
73  }
74  virtHeader_ = req.nr_arg1;
75 
76  //setting up poll - might be useful or not
77  pfd_.fd = nmd_->fd;
78  pfd_.events = POLLIN;
79  sleep(config_.getExt<int>("nmResetWaitSec"));
80  //cleanup rings
81  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
82  HMBDC_THROW(std::runtime_error, "IO error");
83  }
84  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
85  struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
86  if (nm_ring_empty(rxring))
87  continue;
88  rxring->head = rxring->cur = rxring->tail;
89  }
90  }
91 
92  ~NmRecvTransport(){
93  nm_close(nmd_);
94  }
95 
96  void runOnce() HMBDC_RESTRICT {
97  syncNetmap();
98  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
99  struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
100  if (nm_ring_empty(rxring))
101  continue;
102 
103  recvPackets(rxring);
104  }
105  }
106 
107 private:
108  /**
109  * @brief sync using busy wait or poll depending on config
110  * @details it turns out busy wait performance is very poor when using vale
111  * poll works mostly, but it works well only when an enough timeout is given
112  * less than 10 milli wont work well
113  */
114  void syncNetmap() HMBDC_RESTRICT {
115  if (hmbdc_likely(busyWait_)) {
116  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
117  HMBDC_THROW(std::runtime_error, "IO error");
118  } else {
119  return;
120  }
121  } else {
122  auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
123  if (hmbdc_unlikely( res < 0)) {
124  HMBDC_THROW(std::runtime_error, "IO error errno=" << errno);
125  } else {
126  return;
127  }
128  }
129  }
130 
131  void recvPackets(struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
132  auto cur = ring->cur;
133 
134  while(cur != ring->tail) {
135  struct netmap_slot *slot = &ring->slot[cur];
136  auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
137  auto* p = reinterpret_cast<pkt*>(buf + virtHeader_ - sizeof(virt_header));
138 
139  if (p->ipv4.ip.ip_p == IPPROTO_UDP
140  && p->ipv4.ip.ip_off == htons(IP_DF)) {
141  if (!data_) {
142  data_ = (uint8_t*)p->ipv4.body;
143 #pragma GCC diagnostic push
144 #if defined __clang__ || __GNUC_PREREQ(9,0)
145 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
146 #endif
147  auto ip = &p->ipv4.ip;
148  senderEndpoint_ = ip->ip_src.s_addr;
149  }
150  while (data_ + sizeof(TransportMessageHeader) < buf + slot->len) {
151  auto header = reinterpret_cast<TransportMessageHeader*>(data_);
152  // HMBDC_LOG_DEBUG(header->typeTag());
153  // HMBDC_LOG_DEBUG(seq);
154  if (data_ + header->wireSize() <= buf + slot->len) {
155  if (hmbdc_unlikely(header->typeTag() == TypeTagBackupSource::typeTag)) {
156  auto it = cmdBuffer_.claim();
157  char* b = static_cast<char*>(*it);
158  size_t l = header->messagePayloadLen;
159  l = std::min(cmdBuffer_.maxItemSize(), l);
160  memcpy(b, header->payload(), l);
161  auto& bts = reinterpret_cast<MessageWrap<TypeTagBackupSource>*>(b)->payload;
162  bts.sendFrom = senderEndpoint_;
163  cmdBuffer_.commit(it);
164  } else {
165  auto session = sessionDict_.find(senderEndpoint_);
166  if (hmbdc_unlikely(session != sessionDict_.end())) {
167  auto a = session->second->accept(header);
168  if (a == 0) {
169  return; //wait for backup
170  }
171  }
172  } //else drop and move to next msg
173  data_ += header->wireSize();
174  } else {
175  break;
176  }
177  }
178  }
179  data_ = nullptr;
180  cur = nm_ring_next(ring, cur);
181  }
182 
183  ring->head = ring->cur = cur;
184  }
185 
186  static
187  bool verifyChecksum(pkt* HMBDC_RESTRICT packet, size_t payloadWireSize) {
188  {
189  auto tmp = packet->ipv4.ip.ip_sum;
190  packet->ipv4.ip.ip_sum = 0;
191  if (tmp != wrapsum(
192  checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0))) {
193  return false;
194  }
195  packet->ipv4.ip.ip_sum = tmp;
196  }
197 
198  {
199  auto tmp = packet->ipv4.udp.check;
200  packet->ipv4.udp.check = 0;
201 
202  auto udp = &packet->ipv4.udp;
203  if (tmp != wrapsum(
204  checksum(udp, sizeof(*udp), /* udp header */
205  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
206  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
207  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
208  return false;
209  }
210 #pragma GCC diagnostic pop
211  packet->ipv4.udp.check = tmp;
212  return true;
213  }
214  }
215 
216  struct nm_desc *nmd_;
217  int virtHeader_; //v hdr len
218  bool doChecksum_;
219  struct pollfd pfd_;
220  bool busyWait_;
221  int pollWaitTimeMillisec_;
222  uint8_t* data_;
223 
225  size_t maxItemSize_;
226  uint32_t senderEndpoint_;
227  char buf_[4*1024];
228  char* bufCur_;
229  TypeTagSet const& subscriptions_;
230  Ep2SessionDict& sessionDict_;
231 };
232 
233 } //nmrecvtransport_detail
234 template <typename OutputBuffer, typename Ep2SessionDict>
235 using NmRecvTransport = nmrecvtransport_detail::NmRecvTransport<OutputBuffer, Ep2SessionDict>;
236 }}}
Definition: MonoLockFreeBuffer.hpp:16
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: Misc.h:9
Definition: TypedString.hpp:84
Definition: Misc.h:55
impl class
Definition: NmRecvTransport.hpp:31
Definition: Misc.h:51
void syncNetmap() HMBDC_RESTRICT
sync using busy wait or poll depending on config
Definition: NmRecvTransport.hpp:114
Definition: TypeTagSet.hpp:141
Definition: Message.hpp:263
Definition: Rater.hpp:10
Definition: Endpoint.hpp:16
Definition: Base.hpp:12