hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
8 
9 #include "hmbdc/app/netmap/Messages.hpp"
10 #include "hmbdc/app/Base.hpp"
11 #include "hmbdc/comm/Topic.hpp"
12 #include "hmbdc/comm/eth/Misc.h"
13 #include "hmbdc/Compile.hpp"
14 #include "hmbdc/text/StringTrieSet.hpp"
15 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
16 #include "hmbdc/MetaUtils.hpp"
17 
18 
19 #include <boost/lexical_cast.hpp>
20 #include <memory>
21 #include <type_traits>
22 #include <mutex>
23 
24 #include <netinet/ether.h> /* ether_aton */
25 #include <linux/if_packet.h> /* sockaddr_ll */
26 #include <sys/sysctl.h> /* sysctl */
27 #include <ifaddrs.h> /* getifaddrs */
28 #include <poll.h>
29 
30 namespace hmbdc { namespace app { namespace netmap {
31 
32 struct NetContext;
33 /**
34  * @brief power a netmap port receiving functions
35  * @details this needs to be created using NetContext and start in an app::Context
36  */
37 struct RecvTransport {
38  using ptr = std::shared_ptr<RecvTransport>;
39  virtual ~RecvTransport(){}
40  /**
41  * @brief a take all arbitrator (no arbitration at all)
42  * @details it provides the default type for arbitration which does nothing
43  * it also provides a template on writing a user defined arbitrator
44  *
45  * @param h handle to retrieve what is inside of the message
46  * @return always returns 1 here
47  * In general: 1 keep the message; -1 to drop;
48  * 0 cannot decide, ask me later.
49  */
50  struct NoOpArb {
51  int operator()(TransportMessageHeader const* h) {
52  return 1; //always keep it
53  }
54  };
55 private:
56  friend struct hmbdc::app::netmap::NetContext; //only be used by NetContext
57  virtual void listenTo(comm::Topic const& t) = 0;
58  virtual void stopListenTo(comm::Topic const& t) = 0;
59 };
60 
61 /**
62  * @brief impl class,
63  * @details this needs to be created using NetContext and start in an app::Context
64  *
65  * @tparam OutputBuffer type of buffer to hold resulting network messages
66  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
67  * between different recv transports. By default, keeping all
68  */
69 template <typename OutBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
72 , Client<RecvTransportEngine<OutBuffer, MsgArbitrator>> {
73 private:
74  using MD = MessageDispacher<RecvTransportEngine, std::tuple<Subscribe, Unsubscribe>>;
75  friend struct hmbdc::app::netmap::NetContext; //only be created by NetContext
76 
77 /**
78  * @brief ctor
79  *
80  * @param cfg specify the details of the netmap transport
81  * @param outputBuffer holding the results
82  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
83  * raw netmap (udp) packet (BEFORE topic filtering) or hmbdc message (AFTER topic filtering) level
84  * arbitration depending on which one of
85  * int operator()(void* bytes, size_t len) or
86  * int operator()(TransportMessageHeader const* header) presents in the arb
87  */
88  RecvTransportEngine(Config const& config, OutBuffer& outBuffer
89  , MsgArbitrator arb = NoOpArb())
90  : config_(config)
91  , data_(nullptr)
92  , buffer_(std::max(sizeof(MessageWrap<Subscribe>), sizeof(MessageWrap<Unsubscribe>))
93  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
94  , outBuffer_(outBuffer)
95  , maxMessageSize_(outBuffer.maxItemSize())
96  , doChecksum_(config_.getExt<bool>("doChecksum"))
97  , busyWait_(true)
98  , pollWaitTimeMillisec_(0)
99  , arb_(arb) {
100  config (hmbdcName_, "hmbdcName")
101  (schedPolicy_, "schedPolicy")
102  (schedPriority_, "schedPriority")
103  ;
104 
105  busyWait_ = config_.getExt<bool>("busyWait");
106  if (!busyWait_) pollWaitTimeMillisec_ = config_.getExt<int>("pollWaitTimeMillisec");
107 
108  struct nmreq baseNmd;
109  bzero(&baseNmd, sizeof(baseNmd));
110  baseNmd.nr_flags |= NR_ACCEPT_VNET_HDR;
111  config_(baseNmd.nr_tx_slots, "nmTxSlots");
112  config_(baseNmd.nr_rx_slots, "nmRxSlots");
113  config_(baseNmd.nr_tx_rings, "nmTxRings");
114  config_(baseNmd.nr_rx_rings, "nmRxRings");
115  auto nmport = config_.getExt<std::string>("netmapPort");
116  uint32_t flags = config_.getExt<uint32_t>("nmOpenFlags");
117  nmd_ = nm_open(nmport.c_str(), &baseNmd, flags, NULL);
118  if (!nmd_) {
119  HMBDC_THROW(std::runtime_error, "cannot open " << nmport);
120  }
121 
122  struct nmreq req;
123  memset(&req, 0, sizeof(req));
124  bcopy(nmd_->req.nr_name, req.nr_name, sizeof(req.nr_name));
125  req.nr_version = NETMAP_API;
126  req.nr_cmd = NETMAP_VNET_HDR_GET;
127  int err = ioctl(nmd_->fd, NIOCREGIF, &req);
128  if (err) {
129  HMBDC_THROW(std::runtime_error, "Unable to get virtio-net header length");
130  }
131  virtHeader_ = req.nr_arg1;
132 
133  //setting up poll - might be useful or not
134  pfd_.fd = nmd_->fd;
135  pfd_.events = POLLIN;
136  sleep(config_.getExt<int>("nmResetWaitSec"));
137  //cleanup rings
138  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
139  HMBDC_THROW(std::runtime_error, "IO error");
140  }
141  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
142  struct netmap_ring * rxring = NETMAP_TXRING(nmd_->nifp, i);
143  if (nm_ring_empty(rxring))
144  continue;
145  rxring->head = rxring->cur = rxring->tail;
146  }
147  }
148 
149  void listenTo(Topic const& t) override {
150  buffer_.put(MessageWrap<Subscribe>{t});
151  }
152 
153  void stopListenTo(Topic const& t) override {
154  buffer_.put(MessageWrap<Unsubscribe>{t});
155  }
156 
157 public:
158 
159 /**
160  * @brief if the user choose no to have a Context to manage and run the engine
161  * this method can be called from any thread from time to time to have the engine
162  * do its job
163  * @details thread safe and non-blocking, if the engine is already being powered by a Context,
164  * this method has no effect
165  */
166  void rotate() {
167  if (runLock_.try_lock()) {
169  runLock_.unlock();
170  }
171  }
172 
173 /**
174  * @brief start the show by schedule the message recv
175  */
176  /*virtual*/
177  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
178  runLock_.lock();
179  };
180 
182  this->runLock_.unlock();
183  if (nmd_) nm_close(nmd_);
184  }
185 
186  /*virtual*/
187  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
189  auto n = buffer_.peek(begin, end);
190  auto it = begin;
191  while (it != end) {
192  MD()(*this, *static_cast<MessageHead*>(*it++));
193  }
194  buffer_.wasteAfterPeek(begin, n);
195 
196  syncNetmap();
197  for (int i = nmd_->first_rx_ring; i <= nmd_->last_rx_ring; i++) {
198  struct netmap_ring * rxring = NETMAP_RXRING(nmd_->nifp, i);
199  if (nm_ring_empty(rxring))
200  continue;
201 
202  recvPackets(rxring);
203  }
204  }
205 
206  void handleMessageCb(Subscribe const& t) {
207  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
208  }
209 
210  void handleMessageCb(Unsubscribe const& t) {
211  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
212  }
213 
214  void stoppedCb(std::exception const& e) override {
215  HMBDC_LOG_C(e.what());
216  };
217 
218  char const* hmbdcName() const {
219  return this->hmbdcName_.c_str();
220  }
221 
222  std::tuple<char const*, int> schedSpec() const {
223  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
224  }
225 
226 private:
227  template <bool is_raw_arb>
228  typename std::enable_if<is_raw_arb, int>::type applyRawArb(size_t len) {
229  return arb_(data_, len);
230  }
231 
232  template <bool is_raw_arb>
233  typename std::enable_if<!is_raw_arb, int>::type applyRawArb(size_t) {
234  return 1;
235  }
236 
237  template <bool is_raw_arb>
238  typename std::enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
239  return arb_(h);
240  }
241 
242  template <bool is_raw_arb>
243  typename std::enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
244  return 1;
245  }
246 
247  /**
248  * @brief sync using busy wait or poll depending on config
249  * @details it turns out busy wait performance is very poor when using vale
250  * poll works mostly, but it works well only when an enough timeout is given
251  * less than 10 milli wont work well
252  */
253  void syncNetmap() HMBDC_RESTRICT {
254  if (hmbdc_likely(busyWait_)) {
255  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCRXSYNC, NULL) < 0)) {
256  HMBDC_THROW(std::runtime_error, "IO error");
257  } else {
258  return;
259  }
260  } else {
261  auto res = poll(&pfd_, 1, pollWaitTimeMillisec_);
262  if (hmbdc_unlikely( res < 0)) {
263  HMBDC_THROW(std::runtime_error, "IO error errno=" << errno);
264  } else {
265  return;
266  }
267  }
268  }
269 
270  void recvPackets(struct netmap_ring * HMBDC_RESTRICT ring) HMBDC_RESTRICT {
271  using namespace comm::eth;
272  auto cur = ring->cur;
273 
274  while(cur != ring->tail) {
275  struct netmap_slot *slot = &ring->slot[cur];
276  auto *buf = (uint8_t*)NETMAP_BUF(ring, slot->buf_idx);
277  auto* p = reinterpret_cast<pkt*>(buf + virtHeader_ - sizeof(virt_header));
278 
279  if (hmbdc_likely(p->ipv4.ip.ip_p == IPPROTO_UDP
280  && p->ipv4.ip.ip_off == htons(IP_DF))) {
281  if (!data_) data_ = (uint8_t*)p->ipv4.body;
282 
283  using traits =
284  function_traits<typename std::remove_reference<MsgArbitrator>::type>;
285  using arg0 = typename traits::template arg<0>::type;
286  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
287  auto a = applyRawArb<is_raw_arb>(slot->len);
288  if (hmbdc_unlikely(a == 0)) {
289  //keep data_ unchanged
290  ring->head = ring->cur = cur;
291  return;
292  } else if (hmbdc_likely(a > 0)) {
293  while (data_ + sizeof(TransportMessageHeader) < buf + slot->len) {
294  auto header = reinterpret_cast<TransportMessageHeader*>(data_);
295  if (data_ + header->wireSize() <= buf + slot->len) {
296  if (subscriptions_.check(header->topic())) {
297  auto a = applyArb<is_raw_arb>(header);
298  if (hmbdc_likely(a > 0)) {
299  auto l = std::min(maxMessageSize_, (size_t)header->messagePayloadLen());
300  outBuffer_.put(header->payload(), l);
301  } else if (hmbdc_unlikely(a == 0)) {
302  ring->head = ring->cur = cur;
303  return;
304  } //else drop and move to next msg
305  }
306  data_ += header->wireSize();
307  } else {
308  break;
309  }
310  }
311  } //else drop the packet
312  }
313  data_ = nullptr;
314  cur = nm_ring_next(ring, cur);
315  }
316 
317  ring->head = ring->cur = cur;
318  }
319 
320  static
321  bool verifyChecksum(comm::eth::pkt* HMBDC_RESTRICT packet, size_t payloadWireSize) {
322  using namespace comm::eth;
323  {
324  auto tmp = packet->ipv4.ip.ip_sum;
325  packet->ipv4.ip.ip_sum = 0;
326  if (tmp != wrapsum(
327  checksum(&packet->ipv4.ip, sizeof(packet->ipv4.ip), 0))) {
328  return false;
329  }
330  packet->ipv4.ip.ip_sum = tmp;
331  }
332 
333  {
334  auto tmp = packet->ipv4.udp.check;
335  packet->ipv4.udp.check = 0;
336 #pragma GCC diagnostic push
337 #ifdef __clang__
338 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
339 #endif
340  auto udp = &packet->ipv4.udp;
341  if (tmp != wrapsum(
342  checksum(udp, sizeof(*udp), /* udp header */
343  checksum(packet->ipv4.body, payloadWireSize, /* udp payload */
344  checksum(&packet->ipv4.ip.ip_src, 2 * sizeof(packet->ipv4.ip.ip_src), /* pseudo header */
345  IPPROTO_UDP + (u_int32_t)ntohs(udp->len)))))) {
346  return false;
347  }
348  packet->ipv4.udp.check = tmp;
349  return true;
350 #pragma GCC diagnostic pop
351  }
352  }
353  Config config_;
354  std::string hmbdcName_;
355  std::string schedPolicy_;
356  int schedPriority_;
357  uint8_t* data_;
358 
360  OutBuffer& HMBDC_RESTRICT outBuffer_;
361  size_t maxMessageSize_;
362  text::StringTrieSet subscriptions_;
363 
364 
365  struct nm_desc *nmd_;
366  int virtHeader_; //v hdr len
367  bool doChecksum_;
368  struct pollfd pfd_;
369  bool busyWait_;
370  int pollWaitTimeMillisec_;
371  MsgArbitrator arb_;
372  std::mutex runLock_;
373 };
374 
375 }}}
376 
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:225
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: RecvTransportEngine.hpp:214
Definition: Messages.hpp:111
Definition: MonoLockFreeBuffer.hpp:15
power a netmap port receiving functions
Definition: RecvTransportEngine.hpp:37
class to hold an hmbdc configuration
Definition: Config.hpp:46
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:177
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:76
Definition: Misc.h:55
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:169
Definition: Message.hpp:78
Definition: Messages.hpp:118
impl class,
Definition: RecvTransportEngine.hpp:70
RecvTransportEngine(Config const &config, OutBuffer &outBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:88
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:112
void syncNetmap() HMBDC_RESTRICT
sync using busy wait or poll depending on config
Definition: RecvTransportEngine.hpp:253
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: RecvTransportEngine.hpp:166
a singleton that holding netmap resources
Definition: NetContext.hpp:38
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:50
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:181
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: RecvTransportEngine.hpp:187
Definition: Base.hpp:13
Definition: MetaUtils.hpp:9
Definition: LockFreeBufferMisc.hpp:89