hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/tcpcast/Transport.hpp"
4 #include "hmbdc/app/tcpcast/RecvSession.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/app/udpcast/RecvTransportEngine.hpp"
7 #include "hmbdc/app/Logger.hpp"
8 #include "hmbdc/text/StringTrieSet.hpp"
9 
10 #include <boost/bind.hpp>
11 #include <boost/lexical_cast.hpp>
12 #include <boost/unordered_map.hpp>
13 #include <mutex>
14 #include <regex>
15 
16 #include <memory>
17 
18 namespace hmbdc { namespace app { namespace tcpcast {
19 
20 /**
21  * @brief interface to power a tcpcast transport receiving functions
22  */
24 : Transport {
25  using ptr = std::shared_ptr<RecvTransport>;
26  using Transport::Transport;
27  virtual ~RecvTransport(){}
28  /**
29  * @brief a take all arbitrator (no arbitration at all)
30  * @details it provides the default type for arbitration which does nothing
31  * it also provides a template on writing a user defined arbitrator
32  *
33  * @param h handle to retrieve what is inside of the message
34  * @return always returns 1 here
35  * In general: 1 keep the message; -1 to drop;
36  * 0 cannot decide, ask me later.
37  */
38  struct NoOpArb {
39  int operator()(TransportMessageHeader const* h) {
40  return 1; //always keep it
41  }
42  };
43 
44 private:
45  friend struct NetContext; //only be used by NetContext
46  virtual void listenTo(comm::Topic const& t) = 0;
47  virtual void stopListenTo(comm::Topic const& t) = 0;
48 };
49 
50 
51 namespace recvtransportengine_detail {
52 using namespace hmbdc::time;
53 using namespace std;
54 /**
55  * @brief impl class
56  * @details this needs to be created using NetContext and start in an app::Context
57  *
58  * @tparam OutputBuffer type of buffer to hold resulting network messages
59  */
60 template <typename OutputBuffer, typename MsgArbitrator>
65 , Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>> {
66  using MD = MessageDispacher<RecvTransportEngine
67  , std::tuple<Subscribe, Unsubscribe, TopicSource>>;
68  friend struct NetContext; //only be created by NetContext
69 
70 /**
71  * @brief ctor
72  * @details io_service could be passed in by user, in this case NO more than two threads should
73  * power this io_service instance since that would violate the thread garantee of Client, which
74  * is no callbacks are called in parallel
75  *
76  * @param cfg specify the details of the tcpcast transport
77  * @param outputBuffer holding the results
78  * @param arb arbitrator instance to decide which messages to drop and keep; it ONLY supports
79  * hmbdc message (AFTER topic filtering) level (NO packet level since it is tcp)
80  */
82  , OutputBuffer& outputBuffer
83  , MsgArbitrator arb = NoOpArb())
84  : RecvTransport(cfg)
85  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("heartbeatPeriodSeconds")))
86  , config_(cfg)
87  , mcConfig_(cfg)
88  , buffer_(std::max({sizeof(MessageWrap<Subscribe>)
89  , sizeof(MessageWrap<Unsubscribe>)
90  , sizeof(MessageWrap<TopicSource>)
91  }), config_.getExt<uint16_t>("cmdBufferSizePower2"))
92  , outputBuffer_(outputBuffer)
93  , maxItemSize_(outputBuffer.maxItemSize())
94  , arb_(arb)
95  , myIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
96  cfg.getExt<string>("ifaceAddr")).c_str()))
97  , mcastEmuProxyScanTimer_(
98  Duration::seconds(cfg.getExt<size_t>("mcastEmuProxyScanPeriodSeconds"))) {
99  if (outputBuffer.maxItemSize() < sizeof(SessionStarted)
100  || outputBuffer.maxItemSize() < sizeof(SessionDropped)) {
101  HMBDC_THROW(std::out_of_range
102  , "buffer is too small for notification: SessionStarted and SessionDropped");
103  }
104  mcConfig_.put("outBufferSizePower2", 3u);
105  mcReceiver_.reset(new udpcast::RecvTransportImpl<decltype(buffer_)>(
106  mcConfig_, buffer_, udpcast::RecvTransport::NoOpArb()));
107 
108  if (cfg.getExt<bool>("useTcpcastMcProxy")) {
109  config_(tcpcastMcEmuProxyAddrs_, "udpcastDests");
110  if (tcpcastMcEmuProxyAddrs_.size() == 0) {
111  HMBDC_THROW(std::out_of_range, "no TcpcastEmuProxy specified");
112  }
113  topicSinkMsgBufLen_ =
114  udpcast::SendTransport::encode(tcpcastAdTopic_, topicSinkMsgBuf_, sizeof(topicSinkMsgBuf_)
115  , TopicSink{cfg.getExt<std::string>("udpcastListenAddr")
116  , cfg.getExt<uint16_t>("udpcastListenPort")})->wireSize();
117  }
118 
119 
120  // mcReceiver_->start();
121  mcReceiver_->listenTo(tcpcastAdTopic_);
122 
123  setCallback(
124  [this](TimerManager& tm, SysTime const& now) {
125  for (auto& s : recvSessions_) {
126  s.second->heartbeat();
127  }
128  }
129  );
130 
131  schedule(SysTime::now(), *this);
132  if (tcpcastMcEmuProxyAddrs_.size()) {
133  mcastEmuProxyScanTimer_.setCallback(
134  [this](TimerManager& tm, SysTime const& now) {
135  for (auto const& s : tcpcastMcEmuProxyAddrs_) {
136  sendto(mcReceiver_->fd, topicSinkMsgBuf_, topicSinkMsgBufLen_
137  , MSG_NOSIGNAL|MSG_DONTWAIT, (struct sockaddr *)&s.v, sizeof(s.v));
138  }
139  }
140  );
141  schedule(SysTime::now(), mcastEmuProxyScanTimer_);
142  }
143  }
144 
145 /**
146  * @brief if the user choose no to have a Context to manage and run the engine
147  * this method can be called from any thread from time to time to have the engine
148  * do its job
149  * @details thread safe and non-blocking, if the engine is already being powered by a Context,
150  * this method has no effect
151  */
152  void rotate() {
153  if (runLock_.try_lock()) {
154  this->checkTimers(time::SysTime::now());
156  runLock_.unlock();
157  }
158  }
159 
160 /**
161  * @brief start the show by schedule the message recv
162  */
163  /*virtual*/
164  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
165  runLock_.lock();
166  };
167 
168  using Transport::hmbdcName;
169  using Transport::schedSpec;
170 
171 /**
172  * @brief should not happen ever unless an exception thrown
173  *
174  * @param e exception thown
175  */
176  /*virtual*/
177  void stoppedCb(std::exception const& e) override {
178  HMBDC_LOG_C(e.what());
179  };
180 
181 /**
182  * @brief power the io_service and other things
183  *
184  */
185  /*virtual*/
186  void invokedCb(size_t) HMBDC_RESTRICT override {
187  for (auto it = recvSessions_.begin(); it != recvSessions_.end();) {
188  if (hmbdc_unlikely(!(*it).second->runOnce())) {
189  (*it).second->stop();
190  recvSessions_.erase(it++);
191  } else {
192  it++;
193  }
194  }
196  auto n = buffer_.peek(begin, end);
197  auto it = begin;
198  while (it != end) {
199  MD()(*this, *static_cast<MessageHead*>(*it++));
200  }
201  buffer_.wasteAfterPeek(begin, n);
202 
203  mcReceiver_->runOnce(true);
204  }
205 
206  /**
207  * @brief check how many other parties are sending to this engine
208  * @return recipient session count are still active
209  */
210  size_t sessionsRemainingActive() const {
211  return recvSessions_.size();
212  }
213 
214 /**
215  * @brief only used by MD
216  */
217  void handleMessageCb(Subscribe const& t) {
218  subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
219  for (auto& p : recvSessions_) {
220  p.second->sendSubscribe(t.topic);
221  }
222  }
223 
224 /**
225  * @brief only used by MD
226  */
227  void handleMessageCb(Unsubscribe const& t) {
228  subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
229  for (auto& p : recvSessions_) {
230  p.second->sendUnsubscribe(t.topic);
231  }
232  }
233 
234 /**
235  * @brief only used by MD
236  */
237  void handleMessageCb(TopicSource const& t) {
238  auto ip = inet_addr(t.ip);
239  //unless using loopback address, don't EVER listen to myself (own process)
240  if (ip != 0x100007ful //127.0.0.1
241  && ip == myIp_
242  && (!t.loopback || t.pid == getpid())) {
243  return;
244  }
245  auto key = make_pair(ip, t.port);
246  if (recvSessions_.find(key) == recvSessions_.end()) {
247  std::regex topicRegex(t.topicRegex);
248  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
249  if (std::regex_match(it->first.c_str(), topicRegex)
250  || it->second) {
251  try {
252  auto sess = new Session(
253  config_
254  , subscriptions_
255  , topicRegex
256  , outputBuffer_
257  , t.connKey
258  , arb_
259  );
260  sess->start(ip, t.port);
261  recvSessions_[key] = typename Session::ptr(sess);
262  } catch (std::exception const& e) {
263  HMBDC_LOG_C(e.what());
264  }
265  break;
266  }
267  }
268  } //else ignore
269  }
270 
271  void listenTo(Topic const& t) override {
272  buffer_.put(MessageWrap<Subscribe>{t});
273  }
274  void stopListenTo(Topic const& t) override {
275  buffer_.put(MessageWrap<Unsubscribe>{t});
276  }
277 
278  ~RecvTransportEngine() {
279  this->runLock_.unlock();
280  }
281 
282 private:
284 
285  Config config_, mcConfig_;
287  OutputBuffer& outputBuffer_;
288  size_t maxItemSize_;
289  unique_ptr<udpcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
290 
291  text::StringTrieSet subscriptions_;
292  boost::unordered_map<pair<uint64_t, uint16_t>
293  , typename Session::ptr> recvSessions_;
294  MsgArbitrator arb_;
295  uint32_t myIp_;
296  std::vector<comm::inet::Endpoint> tcpcastMcEmuProxyAddrs_;
297  ReoccuringTimer mcastEmuProxyScanTimer_;
298  char topicSinkMsgBuf_[64];
299  size_t topicSinkMsgBufLen_;
300  std::mutex runLock_;
301 };
302 
303 } //recvtransportengine_detail
304 template <typename OutputBuffer, typename MsgArbitrator>
306 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:226
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:81
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Topic.hpp:44
Definition: Messages.hpp:74
Definition: Timers.hpp:66
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
Definition: Messages.hpp:67
void handleMessageCb(TopicSource const &t)
only used by MD
Definition: RecvTransportEngine.hpp:237
Definition: Transport.hpp:69
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:145
void handleMessageCb(Unsubscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:227
Definition: Message.hpp:78
Definition: Messages.hpp:154
Definition: Time.hpp:14
size_t sessionsRemainingActive() const
check how many other parties are sending to this engine
Definition: RecvTransportEngine.hpp:210
a singleton that holding tcpcast resources
Definition: NetContext.hpp:44
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: RecvTransportEngine.hpp:152
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:38
impl class
Definition: RecvTransportEngine.hpp:58
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:112
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:177
Definition: Time.hpp:133
Definition: Messages.hpp:83
Definition: Rater.hpp:10
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:23
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
impl class
Definition: RecvTransportEngine.hpp:61
void invokedCb(size_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:186
Definition: Base.hpp:13
void handleMessageCb(Subscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:217
Definition: Timers.hpp:113
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:131
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:164
Definition: LockFreeBufferMisc.hpp:89