hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
8 
9 #include "hmbdc/tips/netmap/Messages.hpp"
10 #include "hmbdc/tips/netmap/DefaultUserConfig.hpp"
11 #include "hmbdc/tips/TypeTagSet.hpp"
12 #include "hmbdc/app/Base.hpp"
13 #include "hmbdc/comm/eth/Misc.h"
14 #include "hmbdc/Compile.hpp"
15 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
16 #include "hmbdc/MetaUtils.hpp"
17 
18 
19 #include <boost/lexical_cast.hpp>
20 #include <memory>
21 #include <type_traits>
22 #include <mutex>
23 
24 #include <netinet/ether.h> /* ether_aton */
25 #include <linux/if_packet.h> /* sockaddr_ll */
26 #include <sys/sysctl.h> /* sysctl */
27 #include <ifaddrs.h> /* getifaddrs */
28 #include <poll.h>
29 
30 namespace hmbdc { namespace tips { namespace netmap {
31 template <typename OutBuffer>
33 : app::Client<RecvTransportEngine<OutBuffer>> {
34  RecvTransportEngine(app::Config cfg, OutBuffer& outBuffer)
35  : config_((cfg.setAdditionalFallbackConfig(app::Config{DefaultUserConfig})
36  , cfg.resetSection("rx", false)))
37  , data_(nullptr)
38  , outBuffer_(outBuffer)
39  , maxMessageSize_(outBuffer.maxItemSize())
40  , doChecksum_(config_.getExt<bool>("doChecksum"))
41  , busyWait_(true)
42  , pollWaitTimeMillisec_(0) {
43  config_(hmbdcName_, "hmbdcName")
44  (schedPolicy_, "schedPolicy")
45  (schedPriority_, "schedPriority")
46  ;
47 
48  busyWait_ = config_.getExt<bool>("busyWait");
49  if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<int>("pollWaitTimeMillisec");
50 
51  struct nmreq baseNmd;
52  bzero(&baseNmd, sizeof(baseNmd));
53  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
54  config_(baseNmd.nr_tx_slots, "nmTxSlots");
55  config_(baseNmd.nr_rx_slots, "nmRxSlots");
56  config_(baseNmd.nr_tx_rings, "nmTxRings");
57  config_(baseNmd.nr_rx_rings, "nmRxRings");
58  auto nmport = config_.getExt<std::string>("netmapPort");
59  uint32_t flags = config_.getExt<uint32_t>("nmOpenFlags");
60  nmd_ = nm_open(nmport.c_str(), &baseNmd, flags, NULL);
61  if (!nmd_) {
62  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
63  }
64 
65  struct nmreq req;
66  memset(&req, 0, sizeof(req));
67  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
68  req.nr_version = NETMAP_API;
69  req.nr_cmd = NETMAP_VNET_HDR_GET;
70  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
71  if (err) {
72  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
73  }
74  virtHeader_ = req.nr_arg1;
75 
76  //setting up poll - might be useful or not
77  pfd_.fd = nmd_->fd;
78  pfd_.events = POLLIN;
79  sleep(config_.getExt<int>("nmResetWaitSec"));
80  //cleanup rings
81  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
82  HMBDC_THROW(std::runtime_error, "IO error");
83  }
84  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
85  struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
86  if (nm_ring_empty(rxring))
87  continue;
88  rxring->head = rxring->cur = rxring->tail;
89  }
90  }
91 
92  void rotate() {
94  }
95 
96 
98  if (nmd_) nm_close(nmd_);
99  }
100 
101  void invokedCb(size_t) HMBDC_RESTRICT override {
102  syncNetmap();
103  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
104  struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
105  if (nm_ring_empty(rxring))
106  continue;
107 
108  recvPackets(rxring);
109  }
110  }
111 
112  void stoppedCb(std::exception const& e) override {
113  HMBDC_LOG_C(e.what());
114  };
115 
116  char const* hmbdcName() const {
117  return this->hmbdcName_.c_str();
118  }
119 
120  std::tuple<char const*, int> schedSpec() const {
121  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
122  }
123 
124  template <MessageTupleC Messages, typename CcNode>
125  void subscribeFor(CcNode const& node, uint16_t mod, uint16_t res) {
126  subscriptions_.addSubsFor<Messages>(node, mod, res);
127  }
128 
129  template <MessageC Message>
130  void subscribe() {
131  subscriptions_.add(Message::typeTag);
132  }
133 
134 private:
135 
136  /**
137  * @brief sync using busy wait or poll depending on config
138  * @details it turns out busy wait performance is very poor when using vale
139  * poll works mostly, but it works well only when an enough timeout is given
140  * less than 10 milli wont work well
141  */
142  void syncNetmap() HMBDC_RESTRICT {
143  if (hmbdc_likely(busyWait_)) {
144  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
145  HMBDC_THROW(std::runtime_error, "IO error");
146  } else {
147  return;
148  }
149  } else {
150  auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
151  if (hmbdc_unlikely( res < 0)) {
152  HMBDC_THROW(std::runtime_error, "IO error errno=" << errno);
153  } else {
154  return;
155  }
156  }
157  }
158 
159  void recvPackets(struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
160  using namespace comm::eth;
161  auto cur = ring->cur;
162 
163  while(cur != ring->tail) {
164  struct netmap_slot *slot = &ring->slot[cur];
165  auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
166  auto* p = reinterpret_cast<pkt*>(buf + virtHeader_ - sizeof(virt_header));
167 
168  if (hmbdc_likely(p->ipv4.ip.ip_p == IPPROTO_UDP
169  && p->ipv4.ip.ip_off == htons(IP_DF))) {
170  if (!data_) data_ = (uint8_t*)p->ipv4.body;
171  while (data_ + sizeof(TransportMessageHeader) < buf + slot->len) {
172  auto header = reinterpret_cast<TransportMessageHeader*>(data_);
173  if (data_ + header->wireSize() <= buf + slot->len) {
174  if (subscriptions_.check(header->typeTag())) {
175  auto l = std::min(maxMessageSize_, (size_t)header->messagePayloadLen());
176  outBuffer_.put(header->payload(), l);
177  }
178  data_ += header->wireSize();
179  } else {
180  break;
181  }
182  }
183  }
184  data_ = nullptr;
185  cur = nm_ring_next(ring, cur);
186  }
187 
188  ring->head = ring->cur = cur;
189  }
190 
191  static
192  bool verifyChecksum(comm::eth::pkt* HMBDC_RESTRICT packet, size_t payloadWireSize) {
193  using namespace comm::eth;
194  {
195  auto tmp = packet->ipv4.ip.ip_sum;
196  packet->ipv4.ip.ip_sum = 0;
197  if (tmp != wrapsum(
198  checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0))) {
199  return false;
200  }
201  packet->ipv4.ip.ip_sum = tmp;
202  }
203 
204  {
205  auto tmp = packet->ipv4.udp.check;
206  packet->ipv4.udp.check = 0;
207 #pragma GCC diagnostic push
208 #ifdef __clang__
209 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
210 #endif
211  auto udp = &packet->ipv4.udp;
212  if (tmp != wrapsum(
213  checksum(udp, sizeof(*udp), /* udp header */
214  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
215  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
216  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
217  return false;
218  }
219  packet->ipv4.udp.check = tmp;
220  return true;
221 #pragma GCC diagnostic pop
222  }
223  }
224  app::Config config_;
225  std::string hmbdcName_;
226  std::string schedPolicy_;
227  int schedPriority_;
228  uint8_t* data_;
229 
230  OutBuffer& HMBDC_RESTRICT outBuffer_;
231  size_t maxMessageSize_;
232  TypeTagSet subscriptions_;
233 
234 
235  struct nm_desc *nmd_;
236  int virtHeader_; //v hdr len
237  bool doChecksum_;
238  struct pollfd pfd_;
239  bool busyWait_;
240  int pollWaitTimeMillisec_;
241 };
242 
243 }}}
244 
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
class to hold an hmbdc configuration
Definition: Config.hpp:45
void setAdditionalFallbackConfig(Config const &c)
set additional defaults
Definition: Config.hpp:154
Definition: RecvTransportEngine.hpp:32
Config & resetSection(char const *section, bool sectionExists=true)
change section name
Definition: Config.hpp:177
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: RecvTransportEngine.hpp:112
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: RecvTransportEngine.hpp:101
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: Base.hpp:12
void syncNetmap() HMBDC_RESTRICT
sync using busy wait or poll depending on config
Definition: RecvTransportEngine.hpp:142