hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/udpcast/Transport.hpp"
4 #include "hmbdc/text/StringTrieSet.hpp"
5 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
6 #include "hmbdc//MetaUtils.hpp"
7 
8 #include <boost/lexical_cast.hpp>
9 
10 #include <memory>
11 #include <type_traits>
12 #include <mutex>
13 
14 namespace hmbdc { namespace app { namespace udpcast {
15 
16 /**
17  * @brief interface to power a multicast transport receiving functions
18  */
20  using ptr = std::shared_ptr<RecvTransport>;
21  using Transport::Transport;
22  virtual ~RecvTransport(){}
23  /**
24  * @brief a take all arbitrator (no arbitration at all)
25  * @details it provides the default type for arbitration which does nothing
26  * it also provides a template on writing a user defined arbitrator
27  *
28  * @param h handle to retrieve what is inside of the message
29  * @return always returns 1 here
30  * In general: 1 keep the message; -1 to drop;
31  * 0 cannot decide, ask me later.
32  */
33  struct NoOpArb {
34  int operator()(TransportMessageHeader const* h) {
35  return 1; //always keep it
36  }
37  };
38 private:
39  friend struct NetContext; //only be used by NetContext
40  virtual void listenTo(Topic const& t) = 0;
41  virtual void stopListenTo(Topic const& t) = 0;
42 };
43 
44 namespace recvtransportengine_detail {
45 using namespace hmbdc::time;
46 using namespace std;
48 
49 /**
50  * @class RecvTransportImpl<>
51  * @brief impl class
52  *
53  * @tparam OutputBuffer type of buffer to hold resulting network messages
54  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
55  * between different recv transport. By default, keeping all
56  */
57 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
59 : RecvTransport {
60  using MD = MessageDispacher<RecvTransportImpl, std::tuple<Subscribe, Unsubscribe>>;
61  friend struct NetContext; //only be created by NetContext
62 
63 /**
64  * @brief ctor
65  * @details io_service could be passed in by user, in this case NO more than two threads should
66  * power this io_service instance since that would violate the thread garantee of Client, which
67  * is no callbacks are called in parallel
68  *
69  * @param cfg specify the details of the udpcast transport
70  * @param outputBuffer holding the results
71  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
72  * raw udp packet (BEFORE topic filtering) or hmbdc udpcast message (AFTER topic filtering) level
73  * arbitration depending on which one of
74  * int operator()(void* bytes, size_t len) or
75  * int operator()(TransportMessageHeader const* header) presents in the arb
76  */
78  , OutputBuffer& outputBuffer
79  , MsgArbitrator arb = NoOpArb())
80  : RecvTransport(cfg)
81  , buffer_(std::max(sizeof(MessageWrap<Subscribe>), sizeof(MessageWrap<Unsubscribe>))
82  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
83  , outputBuffer_(outputBuffer)
84  , maxItemSize_(outputBuffer.maxItemSize())
85  , buf_(new char[mtu_])
86  , bufCur_(buf_)
87  , bytesRecved_(0)
88  , arb_(arb) {
89  uint32_t yes = 1;
90  if (setsockopt(fd, SOL_SOCKET,SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
91  HMBDC_LOG_C("failed to set reuse address errno=", errno);
92  }
93  auto sz = config_.getExt<int>("udpRecvBufferBytes");
94  if (sz) {
95  if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
96  HMBDC_LOG_C("failed to set send buffer size=", sz);
97  }
98  }
99 
100  auto udpcastListenPort = config_.getExt<uint16_t>("udpcastListenPort");
101  struct sockaddr_in udpcastListenAddrPort;
102  memset(&udpcastListenAddrPort, 0, sizeof(udpcastListenAddrPort));
103  udpcastListenAddrPort.sin_family = AF_INET;
104  auto ipStr = cfg.getExt<string>("udpcastListenAddr") == string("ifaceAddr")
105  ? comm::inet::getLocalIpMatchMask(cfg.getExt<string>("ifaceAddr")):cfg.getExt<string>("udpcastListenAddr");
106  udpcastListenAddrPort.sin_addr.s_addr = inet_addr(ipStr.c_str());
107  udpcastListenAddrPort.sin_port = htons(udpcastListenPort);
108  if (bind(fd, (struct sockaddr *)&udpcastListenAddrPort, sizeof(udpcastListenAddrPort)) < 0) {
109  HMBDC_THROW(runtime_error, "failed to bind unicast udpcast listen address "
110  << ipStr << ':' << cfg.getExt<short>("udpcastListenPort") << " errno=" << errno);
111  }
112 
113  if ((udpcastListenAddrPort.sin_addr.s_addr & 0x000000F0) == 0xE0) {//multicast address
114  /* use setsockopt() to request that the kernel join a multicast group */
115  struct ip_mreq mreq;
116  mreq.imr_multiaddr.s_addr = udpcastListenAddrPort.sin_addr.s_addr;
117  auto iface =
118  comm::inet::getLocalIpMatchMask(config_.getExt<string>("ifaceAddr"));
119  mreq.imr_interface.s_addr=inet_addr(iface.c_str());
120  if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
121  HMBDC_THROW(runtime_error, "failed to join " << ipStr << ':'
122  << cfg.getExt<short>("udpcastPort"));
123  }
124  }
125 
126  start();
127  }
128 
129  ~RecvTransportImpl() {
130  delete [] buf_;
131  }
132 
133 /**
134  * @brief start the show by schedule the mesage recv
135  */
136  void start() {
137  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, *this);
138  }
139 
140  void runOnce(bool alwaysPoll = false) HMBDC_RESTRICT {
141  typename Buffer::iterator begin, end;
142  auto n = buffer_.peek(begin, end);
143  auto it = begin;
144  while (it != end) {
145  MD()(*this, *static_cast<MessageHead*>(*it++));
146  }
147  buffer_.wasteAfterPeek(begin, n);
148  if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
149  utils::EpollTask::instance().poll();
150  }
151  resumeRead();
152  }
153 
154 /**
155  * @brief only used by MD
156  */
157  void handleMessageCb(Subscribe const& t) {
158  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
159  }
160 
161 /**
162  * @brief only used by MD
163  */
164  void handleMessageCb(Unsubscribe const& t) {
165  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
166  }
167 
168 
169  void listenTo(Topic const& t) override {
170  buffer_.put(MessageWrap<Subscribe>{t});
171  }
172 
173  void stopListenTo(Topic const& t) override {
174  buffer_.put(MessageWrap<Unsubscribe>{t});
175  }
176  sockaddr_in udpcastListenRemoteAddr = {0};
177 private:
178  template <bool is_raw_arb>
179  typename enable_if<is_raw_arb, int>::type applyRawArb(void* pkt, size_t len) {
180  return arb_(pkt, len);
181  }
182 
183  template <bool is_raw_arb>
184  typename enable_if<!is_raw_arb, int>::type applyRawArb(void*, size_t) {
185  return 1;
186  }
187 
188  template <bool is_raw_arb>
189  typename enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
190  return arb_(h);
191  }
192 
193  template <bool is_raw_arb>
194  typename enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
195  return 1;
196  }
197 
198  void resumeRead() HMBDC_RESTRICT {
199  using traits =
201  using arg0 = typename traits::template arg<0>::type;
202  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
203  do {
204  if (bytesRecved_ && bufCur_ == buf_) {
205  auto a = applyRawArb<is_raw_arb>(buf_, bytesRecved_);
206  if (hmbdc_unlikely(a == 0)) {
207  //keep bufCur_ and bytesRecved_ unchanged
208  return;
209  } else if (hmbdc_unlikely(a < 0)) { //drop the whole packet
210  bytesRecved_ = 0;
211  } // else process it
212  }
213  while (bytesRecved_) {
214  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
215  auto wireSize = h->wireSize();
216 
217  if (hmbdc_likely(bytesRecved_ >= wireSize)) {
218  if (subscriptions_.check(h->topic())) {
219  if (hmbdc_unlikely(wireSize > bytesRecved_)) {
220  break;
221  }
222  auto a = applyArb<is_raw_arb>(h);
223 
224  if (hmbdc_likely(a > 0)) {
225  auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
226  outputBuffer_.put(h->payload(), l);
227  } else if (a == 0) {
228  //keep bufCur_ and bytesRecved_ unchanged
229  return;
230  } //else drop and move on
231  }
232  bytesRecved_ -= wireSize;
233  bufCur_ += wireSize;
234  } else {
235  break;
236  }
237  }
238  bytesRecved_ = 0;
239  bufCur_ = buf_;
240  if (isFdReady()) {
241  socklen_t addrLen = sizeof(udpcastListenRemoteAddr);
242  auto l = recvfrom(fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT
243  , (struct sockaddr *)&udpcastListenRemoteAddr, &addrLen);
244  if (hmbdc_unlikely(l < 0)) {
245  if (!checkErr()) {
246  HMBDC_LOG_C("recvmmsg failed errno=", errno);
247  }
248  return;
249  } else if (l == 0) {
250  //nothing to do now
251  return;
252  }
253  bytesRecved_ = l;
254  }
255  } while(bytesRecved_);
256  }
257 
258  Buffer buffer_;
259  OutputBuffer& outputBuffer_;
260  size_t maxItemSize_;
261  char* buf_;
262  char* bufCur_;
263  size_t bytesRecved_;
264  text::StringTrieSet subscriptions_;
265  MsgArbitrator arb_;
266 };
267 
268 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
270 : RecvTransportImpl<OutputBuffer, MsgArbitrator>
271 , Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
275 
276 /**
277  * @brief if the user choose no to have a Context to manage and run the engine
278  * this method can be called from any thread from time to time to have the engine
279  * do its job
280  * @details thread safe and non-blocking, if the engine is already being powered by a Context,
281  * this method has no effect
282  */
283  void rotate() {
284  if (runLock_.try_lock()) {
285  RecvTransportEngineImpl::invokedCb(0);
286  runLock_.unlock();
287  }
288  }
289 
290 /**
291  * @brief power the io_service and other things
292  *
293  */
294  /*virtual*/
295  void invokedCb(uint16_t) HMBDC_RESTRICT override {
297  }
298 
299 
300 /**
301  * @brief start the show by schedule the mesage recv
302  */
303  /*virtual*/
304  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
305  runLock_.lock();
306  };
307 
308 /**
309  * @brief should not happen ever unless an exception thrown
310  *
311  * @param e exception thown
312  */
313  /*virtual*/
314  void stoppedCb(std::exception const& e) override {
315  HMBDC_LOG_C(e.what());
316  };
317 
319  this->runLock_.unlock();
320  }
321 
322 private:
323  std::mutex runLock_;
324 };
325 
326 } //recvtransportengine_detail
327 
328 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
330 
331 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
333 
334 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:225
Definition: MonoLockFreeBuffer.hpp:15
void invokedCb(uint16_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:295
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:46
void handleMessageCb(Unsubscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:164
a singleton that holding udpcast resources
Definition: NetContext.hpp:37
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:19
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:76
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
Definition: Messages.hpp:65
void handleMessageCb(Subscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:157
Definition: Message.hpp:78
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:77
impl class
Definition: RecvTransportEngine.hpp:58
Definition: StringTrieSetDetail.hpp:115
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:136
Definition: Message.hpp:112
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:314
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Definition: Messages.hpp:73
Definition: Base.hpp:13
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: RecvTransportEngine.hpp:283
Definition: MetaUtils.hpp:9
Definition: LockFreeBufferMisc.hpp:89
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:304