hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #define NETMAP_WITH_LIBS
6 #include <net/netmap_user.h>
7 #undef NETMAP_WITH_LIBS
8 
9 #include "hmbdc/app/netmap/Messages.hpp"
10 #include "hmbdc/app/Base.hpp"
11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
12 #include "hmbdc/comm/eth/Misc.h"
13 #include "hmbdc/time/Rater.hpp"
14 #include "hmbdc/numeric/BitMath.hpp"
15 
16 #include <regex>
17 #include <memory>
18 #include <type_traits>
19 #include <mutex>
20 
21 
22 #include <netinet/ether.h> /* ether_aton */
23 #include <linux/if_packet.h> /* sockaddr_ll */
24 #include <sys/sysctl.h> /* sysctl */
25 #include <ifaddrs.h> /* getifaddrs */
26 
27 #include <poll.h>
28 
29 namespace hmbdc { namespace app { namespace netmap {
30 
31 struct NetContext;
32 struct Sender;
33 
34 namespace sendtransportengine_detail {
35 using namespace std;
36 using namespace hmbdc::time;
37 using namespace hmbdc::comm::eth;
38 
39 /**
40  * @brief power a netmap port sending functions
41  * @details this needs to be created using NetContext and start in an app::Context
42  *
43  */
45 : Client<SendTransportEngine> {
46  using ptr = std::shared_ptr<SendTransportEngine>;
47 
48 private:
49  uint16_t
50  outBufferSizePower2();
51 
52  friend struct hmbdc::app::netmap::NetContext; //only be created by NetContext
53  SendTransportEngine(Config const&, size_t);
54 public:
55 
56 /**
57  * @brief if the user choose no to have a Context to manage and run the engine
58  * this method can be called from any thread from time to time to have the engine
59  * do its job
60  * @details thread safe and non-blocking, if the engine is already being powered by a Context,
61  * this method has no effect
62  */
63  void rotate() {
64  if (runLock_.try_lock()) {
66  runLock_.unlock();
67  }
68  }
69 
70  /*virtual*/
71  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
72  runLock_.lock();
73  }
74 
76  bool droppedCb() override;
77 
78  /*virtual*/
79  void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override {
80  for (int i = nmd_->first_tx_ring; i <= nmd_->last_tx_ring; i++) {
81  struct netmap_ring * txring = NETMAP_TXRING(nmd_->nifp, i);
82  if (nm_ring_empty(txring))
83  continue;
84 
85  sendPackets(txring);
86  }
87  if (hmbdc_unlikely(ioctl(nmd_->fd, NIOCTXSYNC, NULL) < 0)) {
88  HMBDC_THROW(std::runtime_error, "IO error");
89  }
90  }
91 
92  void stoppedCb(std::exception const& e) override;
93 
94  char const* hmbdcName() const {
95  return this->hmbdcName_.c_str();
96  }
97 
98  std::tuple<char const*, int> schedSpec() const {
99  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
100  }
101 
102 private:
103  bool match(Topic const& t) const {
104  return std::regex_match(t.c_str(), topicRegex_);
105  }
106  friend struct hmbdc::app::netmap::Sender;
107  template <typename... Messages>
108  void queue(Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
109  auto n = sizeof...(msgs);
110  auto it = buffer_.claim(n);
111  queue(it, t, std::forward<Messages>(msgs)...);
112  buffer_.commit(it, n);
113  }
114 
115  template <typename... Messages>
116  bool tryQueue(Topic const& t, Messages&&... msgs) HMBDC_RESTRICT {
117  auto n = sizeof...(msgs);
118  auto it = buffer_.tryClaim(n);
119  if (it) {
120  queue(it, t, std::forward<Messages>(msgs)...);
121  buffer_.commit(it, n);
122  return true;
123  }
124  return false;
125  }
126 
127  template <typename M, typename... Messages>
129  , Topic const& t, M&& msg, Messages&&... msgs) {
130  using Message = typename std::decay<M>::type;
131  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
132  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_)) {
133  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
134  }
135  auto s = *it;
136  TransportMessageHeader::copyTo(s, t, std::forward<M>(msg));
137  queue(++it, t, std::forward<M>(msgs)...);
138  }
139 
140  template <typename Message, typename ... Args>
141  void queueInPlace(Topic const& t, Args&&... args) HMBDC_RESTRICT {
142  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
143  if (hmbdc_unlikely(sizeof(Message) > maxMessageSize_)) {
144  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
145  }
146  auto s = buffer_.claim();
147  TransportMessageHeader::copyToInPlace<Message>(*s, t, std::forward<Args>(args)...);
148  buffer_.commit(s);
149  }
150 
151  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len);
153  , Topic const& t) {}
154 
155  void sendPackets(struct netmap_ring *);
156 
157  void getMacAddresses();
158  static
159  void initializePacket(struct pkt *, int, std::string, std::string, ether_addr, ether_addr, uint16_t, uint16_t);
160 
161  static
162  void updatePacket(struct pkt *, size_t, bool = true);
163 
164  Config const config_;
165  string hmbdcName_;
166  string schedPolicy_;
167  int schedPriority_;
168  std::string topic_;
169  std::regex topicRegex_;
170  size_t maxMessageSize_;
171 
172 
173  struct nm_desc *nmd_;
175  int virtHeader_; //v hdr len
176 
177 
178  ether_addr srcEthAddr_;
179  ether_addr dstEthAddr_;
180  pkt precalculatedPacketHead_;
181  bool doChecksum_;
182  Rater rater_;
183  size_t maxSendBatch_;
184  uint16_t mtu_;
185  std::mutex runLock_;
186 };
187 
188 } //sendtransportengine_detail
189 
191 
192 }}}
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:46
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:79
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Misc.h:9
Definition: TypedString.hpp:76
fascade class for sending network messages
Definition: Sender.hpp:11
Definition: Misc.h:55
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:63
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: SendTransportEngine.hpp:71
power a netmap port sending functions
Definition: SendTransportEngine.hpp:44
Definition: Rater.hpp:10
a singleton that holding netmap resources
Definition: NetContext.hpp:38
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Definition: Rater.hpp:11
Definition: Base.hpp:13
Definition: LockFreeBufferMisc.hpp:89