hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/udpcast/Transport.hpp"
4 #include "hmbdc/text/StringTrieSet.hpp"
5 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
6 #include "hmbdc//MetaUtils.hpp"
7 
8 #include <boost/lexical_cast.hpp>
9 
10 #include <memory>
11 #include <type_traits>
12 #include <mutex>
13 
14 namespace hmbdc { namespace app { namespace udpcast {
15 
16 /**
17  * @brief interface to power a multicast transport receiving functions
18  */
20  using ptr = std::shared_ptr<RecvTransport>;
21  using Transport::Transport;
22  virtual ~RecvTransport(){}
23  /**
24  * @brief a take all arbitrator (no arbitration at all)
25  * @details it provides the default type for arbitration which does nothing
26  * it also provides a template on writing a user defined arbitrator
27  *
28  * @param h handle to retrieve what is inside of the message
29  * @return always returns 1 here
30  * In general: 1 keep the message; -1 to drop;
31  * 0 cannot decide, ask me later.
32  */
33  struct NoOpArb {
34  int operator()(TransportMessageHeader const* h) {
35  return 1; //always keep it
36  }
37  };
38 private:
39  friend struct NetContext; //only be used by NetContext
40  virtual void listenTo(Topic const& t) = 0;
41  virtual void stopListenTo(Topic const& t) = 0;
42 };
43 
44 namespace recvtransportengine_detail {
45 using namespace hmbdc::time;
46 using namespace std;
48 
49 /**
50  * @class RecvTransportImpl<>
51  * @brief impl class
52  *
53  * @tparam OutputBuffer type of buffer to hold resulting network messages
54  * @tparam MsgArbitrator arbitrator to decide drop or keep messages, suited to arbitrate
55  * between different recv transport. By default, keeping all
56  */
57 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
59 : RecvTransport {
60  using MD = MessageDispacher<RecvTransportImpl, std::tuple<Subscribe, Unsubscribe>>;
61  friend struct NetContext; //only be created by NetContext
62 
63 /**
64  * @brief ctor
65  * @details io_service could be passed in by user, in this case NO more than two threads should
66  * power this io_service instance since that would violate the thread garantee of Client, which
67  * is no callbacks are called in parallel
68  *
69  * @param cfg specify the details of the udpcast transport
70  * @param outputBuffer holding the results
71  * @param arb arbitrator instance to decide which messages to drop and keep; it supports either
72  * raw udp packet (BEFORE topic filtering) or hmbdc udpcast message (AFTER topic filtering) level
73  * arbitration depending on which one of
74  * int operator()(void* bytes, size_t len) or
75  * int operator()(TransportMessageHeader const* header) presents in the arb
76  */
78  , OutputBuffer& outputBuffer
79  , MsgArbitrator arb = NoOpArb())
80  : RecvTransport(cfg)
81  , buffer_(std::max(sizeof(MessageWrap<Subscribe>), sizeof(MessageWrap<Unsubscribe>))
82  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
83  , outputBuffer_(outputBuffer)
84  , maxItemSize_(outputBuffer.maxItemSize())
85  , buf_(new char[mtu_])
86  , bufCur_(buf_)
87  , bytesRecved_(0)
88  , arb_(arb) {
89  uint32_t yes = 1;
90  if (setsockopt(fd, SOL_SOCKET,SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
91  HMBDC_LOG_C("failed to set reuse address errno=", errno);
92  }
93  auto sz = config_.getExt<int>("udpRecvBufferBytes");
94  if (sz) {
95  if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
96  HMBDC_LOG_C("failed to set send buffer size=", sz);
97  }
98  }
99 
100  auto udpcastListenPort = config_.getExt<uint16_t>("udpcastListenPort");
101  struct sockaddr_in udpcastListenAddrPort;
102  memset(&udpcastListenAddrPort, 0, sizeof(udpcastListenAddrPort));
103  udpcastListenAddrPort.sin_family = AF_INET;
104  auto ipStr = cfg.getExt<string>("udpcastListenAddr") == string("ifaceAddr")
105  ? comm::inet::getLocalIpMatchMask(cfg.getExt<string>("ifaceAddr")):cfg.getExt<string>("udpcastListenAddr");
106  udpcastListenAddrPort.sin_addr.s_addr = inet_addr(ipStr.c_str());
107  udpcastListenAddrPort.sin_port = htons(udpcastListenPort);
108  if (bind(fd, (struct sockaddr *)&udpcastListenAddrPort, sizeof(udpcastListenAddrPort)) < 0) {
109  HMBDC_THROW(runtime_error, "failed to bind unicast udpcast listen address "
110  << ipStr << ':' << cfg.getExt<short>("udpcastListenPort") << " errno=" << errno);
111  }
112 
113  if ((udpcastListenAddrPort.sin_addr.s_addr & 0x000000F0) == 0xE0) {//multicast address
114  /* use setsockopt() to request that the kernel join a multicast group */
115  struct ip_mreq mreq;
116  mreq.imr_multiaddr.s_addr = udpcastListenAddrPort.sin_addr.s_addr;
117  auto iface =
118  comm::inet::getLocalIpMatchMask(config_.getExt<string>("ifaceAddr"));
119  mreq.imr_interface.s_addr=inet_addr(iface.c_str());
120  if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
121  HMBDC_THROW(runtime_error, "failed to join " << ipStr << ':'
122  << cfg.getExt<short>("udpcastPort"));
123  }
124  }
125  HMBDC_LOG_N("listen at ", cfg.getExt<string>("ifaceAddr"));
126 
127  start();
128  }
129 
130  ~RecvTransportImpl() {
131  delete [] buf_;
132  }
133 
134 /**
135  * @brief start the show by schedule the mesage recv
136  */
137  void start() {
138  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, *this);
139  }
140 
141  void runOnce(bool alwaysPoll = false) HMBDC_RESTRICT {
142  typename Buffer::iterator begin, end;
143  auto n = buffer_.peek(begin, end);
144  auto it = begin;
145  while (it != end) {
146  MD()(*this, *static_cast<MessageHead*>(*it++));
147  }
148  buffer_.wasteAfterPeek(begin, n);
149  if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
150  utils::EpollTask::instance().poll();
151  }
152  resumeRead();
153  }
154 
155 /**
156  * @brief only used by MD
157  */
158  void handleMessageCb(Subscribe const& t) {
159  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
160  }
161 
162 /**
163  * @brief only used by MD
164  */
165  void handleMessageCb(Unsubscribe const& t) {
166  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
167  }
168 
169 
170  void listenTo(Topic const& t) override {
171  buffer_.put(MessageWrap<Subscribe>{t});
172  }
173 
174  void stopListenTo(Topic const& t) override {
175  buffer_.put(MessageWrap<Unsubscribe>{t});
176  }
177  sockaddr_in udpcastListenRemoteAddr = {0};
178 private:
179  template <bool is_raw_arb>
180  typename enable_if<is_raw_arb, int>::type applyRawArb(void* pkt, size_t len) {
181  return arb_(pkt, len);
182  }
183 
184  template <bool is_raw_arb>
185  typename enable_if<!is_raw_arb, int>::type applyRawArb(void*, size_t) {
186  return 1;
187  }
188 
189  template <bool is_raw_arb>
190  typename enable_if<!is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
191  return arb_(h);
192  }
193 
194  template <bool is_raw_arb>
195  typename enable_if<is_raw_arb, int>::type applyArb(TransportMessageHeader* h) {
196  return 1;
197  }
198 
199  void resumeRead() HMBDC_RESTRICT {
200  using traits =
202  using arg0 = typename traits::template arg<0>::type;
203  bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
204  do {
205  if (bytesRecved_ && bufCur_ == buf_) {
206  auto a = applyRawArb<is_raw_arb>(buf_, bytesRecved_);
207  if (hmbdc_unlikely(a == 0)) {
208  //keep bufCur_ and bytesRecved_ unchanged
209  return;
210  } else if (hmbdc_unlikely(a < 0)) { //drop the whole packet
211  bytesRecved_ = 0;
212  } // else process it
213  }
214  while (bytesRecved_) {
215  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
216  auto wireSize = h->wireSize();
217 
218  if (hmbdc_likely(bytesRecved_ >= wireSize)) {
219  if (subscriptions_.check(h->topic())) {
220  if (hmbdc_unlikely(wireSize > bytesRecved_)) {
221  break;
222  }
223  auto a = applyArb<is_raw_arb>(h);
224 
225  if (hmbdc_likely(a > 0)) {
226  auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
227  outputBuffer_.put(h->payload(), l);
228  } else if (a == 0) {
229  //keep bufCur_ and bytesRecved_ unchanged
230  return;
231  } //else drop and move on
232  }
233  bytesRecved_ -= wireSize;
234  bufCur_ += wireSize;
235  } else {
236  break;
237  }
238  }
239  bytesRecved_ = 0;
240  bufCur_ = buf_;
241  if (isFdReady()) {
242  socklen_t addrLen = sizeof(udpcastListenRemoteAddr);
243  auto l = recvfrom(fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT
244  , (struct sockaddr *)&udpcastListenRemoteAddr, &addrLen);
245  if (hmbdc_unlikely(l < 0)) {
246  if (!checkErr()) {
247  HMBDC_LOG_C("recvmmsg failed errno=", errno);
248  }
249  return;
250  } else if (l == 0) {
251  //nothing to do now
252  return;
253  }
254  bytesRecved_ = l;
255  }
256  } while(bytesRecved_);
257  }
258 
259  Buffer buffer_;
260  OutputBuffer& outputBuffer_;
261  size_t maxItemSize_;
262  char* buf_;
263  char* bufCur_;
264  size_t bytesRecved_;
265  text::StringTrieSet subscriptions_;
266  MsgArbitrator arb_;
267 };
268 
269 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
271 : RecvTransportImpl<OutputBuffer, MsgArbitrator>
272 , Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
276 
277 /**
278  * @brief if the user choose no to have a Context to manage and run the engine
279  * this method can be called from any thread from time to time to have the engine
280  * do its job
281  * @details thread safe and non-blocking, if the engine is already being powered by a Context,
282  * this method has no effect
283  */
284  void rotate() {
285  if (runLock_.try_lock()) {
286  RecvTransportEngineImpl::invokedCb(0);
287  runLock_.unlock();
288  }
289  }
290 
291 /**
292  * @brief power the io_service and other things
293  *
294  */
295  /*virtual*/
296  void invokedCb(size_t) HMBDC_RESTRICT override {
298  }
299 
300 
301 /**
302  * @brief start the show by schedule the mesage recv
303  */
304  /*virtual*/
305  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
306  runLock_.lock();
307  };
308 
309 /**
310  * @brief should not happen ever unless an exception thrown
311  *
312  * @param e exception thown
313  */
314  /*virtual*/
315  void stoppedCb(std::exception const& e) override {
316  HMBDC_LOG_C(e.what());
317  };
318 
320  this->runLock_.unlock();
321  }
322 
323 private:
324  std::mutex runLock_;
325 };
326 
327 } //recvtransportengine_detail
328 
329 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
331 
332 template <typename OutputBuffer, typename MsgArbitrator = RecvTransport::NoOpArb>
334 
335 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:226
Definition: MonoLockFreeBuffer.hpp:15
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:46
void handleMessageCb(Unsubscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:165
a singleton that holding udpcast resources
Definition: NetContext.hpp:37
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:19
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Topic.hpp:44
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
Definition: Messages.hpp:65
void handleMessageCb(Subscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:158
Definition: Message.hpp:78
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:77
impl class
Definition: RecvTransportEngine.hpp:58
Definition: StringTrieSetDetail.hpp:115
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:137
Definition: Message.hpp:112
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:315
Definition: Rater.hpp:10
void invokedCb(size_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:296
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Definition: Messages.hpp:73
Definition: Base.hpp:13
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: RecvTransportEngine.hpp:284
Definition: MetaUtils.hpp:9
Definition: LockFreeBufferMisc.hpp:89
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:305