hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/tcpcast/Transport.hpp"
4 #include "hmbdc/tips/tcpcast/RecvSession.hpp"
5 #include "hmbdc/tips/tcpcast/Messages.hpp"
6 #include "hmbdc/tips/tcpcast/DefaultUserConfig.hpp"
7 #include "hmbdc/tips/udpcast/RecvTransportEngine.hpp"
8 #include "hmbdc/app/Logger.hpp"
9 
10 #include <boost/bind.hpp>
11 #include <boost/lexical_cast.hpp>
12 #include <boost/unordered_map.hpp>
13 #include <mutex>
14 #include <regex>
15 
16 #include <memory>
17 
18 namespace hmbdc { namespace tips { namespace tcpcast {
19 
20 /**
21  * @brief interface to power a tcpcast transport receiving functions
22  */
24 : Transport {
25  using Transport::Transport;
26  virtual ~RecvTransport(){}
27 };
28 
29 namespace recvtransportengine_detail {
30 using namespace hmbdc::time;
31 using namespace hmbdc::app;
32 using namespace std;
33 
34 template <typename OutputBuffer, typename AttachmentAllocator>
39 , Client<RecvTransportEngine<OutputBuffer, AttachmentAllocator>> {
41 
42 /**
43  * @brief ctor
44  * @details io_service could be passed in by user, in this case NO more than two threads should
45  * power this io_service instance since that would violate the thread garantee of Client, which
46  * is no callbacks are called in parallel
47  *
48  * @param cfg specify the details of the tcpcast transport
49  * @param outputBuffer holding the results
50  */
52  , OutputBuffer& outputBuffer)
53  : RecvTransport(((cfg.setAdditionalFallbackConfig(Config(DefaultUserConfig))
54  , cfg.resetSection("rx", false))))
55  , ReoccuringTimer(Duration::seconds(cfg.getExt<size_t>("heartbeatPeriodSeconds")))
56  , config_(cfg)
57  , mcConfig_(config_)
58  , buffer_(std::max({sizeof(MessageWrap<TypeTagSource>)})
59  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
60  , outputBuffer_(outputBuffer)
61  , maxItemSize_(outputBuffer.maxItemSize())
62  , myIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
63  config_.getExt<string>("ifaceAddr")).c_str())) {
64  if (!config_.getExt<bool>("allowRecvWithinProcess")) {
65  myPid_ = getpid();
66  }
67  if (outputBuffer.maxItemSize() < sizeof(SessionStarted)
68  || outputBuffer.maxItemSize() < sizeof(SessionDropped)) {
69  HMBDC_THROW(std::out_of_range
70  , "buffer is too small for notification: SessionStarted and SessionDropped");
71  }
72  mcConfig_.put("outBufferSizePower2", 3u);
73  mcReceiver_.emplace(mcConfig_, buffer_);
74  mcReceiver_->template subscribe<TypeTagSource>();
75 
76  setCallback(
77  [this](TimerManager& tm, SysTime const& now) {
78  for (auto& s : recvSessions_) {
79  s.second->heartbeat();
80  }
81  }
82  );
83 
84  schedule(SysTime::now(), *this);
85  }
86 
87  void rotate() {
88  this->checkTimers(time::SysTime::now());
90  }
91 
92  using Transport::hmbdcName;
93  using Transport::schedSpec;
94 
95 /**
96  * @brief should not htipsen ever unless an exception thrown
97  *
98  * @param e exception thown
99  */
100  /*virtual*/
101  void stoppedCb(std::exception const& e) override {
102  HMBDC_LOG_C(e.what());
103  };
104 
105 /**
106  * @brief power the io_service and other things
107  *
108  */
109  /*virtual*/
110  void invokedCb(size_t) HMBDC_RESTRICT override {
111  for (auto it = recvSessions_.begin(); it != recvSessions_.end();) {
112  if (hmbdc_unlikely(!(*it).second->runOnce())) {
113  (*it).second->stop();
114  recvSessions_.erase(it++);
115  } else {
116  it++;
117  }
118  }
120  auto n = buffer_.peek(begin, end);
121  auto it = begin;
122  while (it != end) {
123  MD()(*this, *static_cast<MessageHead*>(*it++));
124  }
125  buffer_.wasteAfterPeek(begin, n);
126 
127  mcReceiver_->runOnce(true);
128  }
129 
130  /**
131  * @brief check how many other parties are sending to this engine
132  * @return recipient session count are still active
133  */
134  size_t sessionsRemainingActive() const {
135  return recvSessions_.size();
136  }
137 
138  template <MessageTupleC Messages, typename CcNode>
139  void subscribeFor(CcNode const& node, uint16_t mod, uint16_t res) {
140  subscriptions_.addSubsFor<Messages>(node, mod, res);
141  }
142 
143 /**
144  * @brief only used by MD
145  */
147  auto ip = inet_addr(t.ip);
148  if (ip == myIp_
149  && !t.loopback) {
150  return;
151  }
152 
153  if (t.srcPid == myPid_ && ip == myIp_) return;
154  auto key = make_pair(ip, t.port);
155  if (recvSessions_.find(key) == recvSessions_.end()) {
156  for (auto i = 0u; i < t.typeTagCountContained; ++i) {
157  if (subscriptions_.check(t.typeTags[i])) {
158  try {
159  auto sess = new Session(
160  config_
161  , subscriptions_
162  , outputBuffer_
163  );
164  sess->start(ip, t.port);
165  recvSessions_[key] = typename Session::ptr(sess);
166  } catch (std::exception const& e) {
167  HMBDC_LOG_C(e.what());
168  }
169  break;
170  }
171  }
172  } //else ignore
173  }
174 
175 private:
177 
178  Config config_, mcConfig_;
180  OutputBuffer& outputBuffer_;
181  size_t maxItemSize_;
182  std::optional<udpcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
183 
184  TypeTagSet subscriptions_;
185  boost::unordered_map<pair<uint64_t, uint16_t>
186  , typename Session::ptr> recvSessions_;
187  uint32_t myIp_;
188  uint32_t myPid_ = 0;
189 };
190 
191 } //recvtransportengine_detail
192 template <typename OutputBuffer, typename AttachmentAllocator>
193 using RecvTransportEngine = recvtransportengine_detail::RecvTransportEngine<OutputBuffer, AttachmentAllocator>;
194 }}}
Definition: MonoLockFreeBuffer.hpp:16
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:23
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
void invokedCb(size_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:110
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
void handleMessageCb(TypeTagSource const &t)
only used by MD
Definition: RecvTransportEngine.hpp:146
Definition: Transport.hpp:65
this message tipsears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:120
RecvTransportEngine(Config cfg, OutputBuffer &outputBuffer)
ctor
Definition: RecvTransportEngine.hpp:51
Definition: MessageDispacher.hpp:184
void stoppedCb(std::exception const &e) override
should not htipsen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:101
Definition: Message.hpp:212
Definition: Messages.hpp:57
Definition: Time.hpp:14
Definition: TypeTagSet.hpp:141
Definition: Message.hpp:263
Definition: Time.hpp:134
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: Base.hpp:12
Definition: Timers.hpp:117
this message tipsears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:106
size_t sessionsRemainingActive() const
check how many other parties are sending to this engine
Definition: RecvTransportEngine.hpp:134
Definition: LockFreeBufferMisc.hpp:89