hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/rmcast/Transport.hpp"
4 #include "hmbdc/tips/rmcast/McRecvTransport.hpp"
5 #include "hmbdc/tips/rmcast/BackupRecvSession.hpp"
6 #include "hmbdc/tips/rmcast/DefaultUserConfig.hpp"
7 #include "hmbdc/tips/reliable/AttBufferAdaptor.hpp"
8 #include "hmbdc/app/Logger.hpp"
9 #include "hmbdc/comm/inet/Hash.hpp"
10 
11 #include <boost/bind.hpp>
12 #include <boost/lexical_cast.hpp>
13 #include <boost/unordered_map.hpp>
14 
15 #include <memory>
16 #include <regex>
17 #include <type_traits>
18 #include <mutex>
19 
20 namespace hmbdc { namespace tips { namespace rmcast {
21 
22 /**
23  * @brief interface to power a multicast transport receiving functions
24  */
28  , cfg.resetSection("rx", false)))
29  {}
30 
31  virtual ~RecvTransport(){}
32 };
33 
34 namespace recvtransportengine_detail {
35 
36 using namespace std;
37 using namespace hmbdc::app;
38 using namespace hmbdc::time;
39 
40 
41 
42 /**
43  * @brief impl class
44  *
45  * @tparam OutputBuffer type of buffer to hold resulting network messages
46  * between different recv transport. By default, keeping all
47  */
48 template <typename OutputBuffer, typename AttachmentAllocator>
51 , TimerManager {
53 
54 /**
55  * @brief ctor
56  * @details io_service could be passed in by user, in this case NO more than two threads should
57  * power this io_service instance since that would violate the thread garantee of Client, which
58  * is no callbacks are called in parallel
59  *
60  * @param cfg specify the details of the rmcast transport
61  * @param outputBuffer holding the results
62  */
63  RecvTransportImpl(Config const& cfg, OutputBuffer& outputBuffer)
64  : RecvTransport(cfg)
65  , cmdBuffer_(std::max({sizeof(MessageWrap<TypeTagBackupSource>)})
66  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
67  , outputBuffer_(outputBuffer)
68  , myBackupIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
69  config_.getExt<string>("tcpIfaceAddr") == string("ifaceAddr")
70  ?config_.getExt<string>("ifaceAddr"):config_.getExt<string>("tcpIfaceAddr")
71  ).c_str()))
72  , mcRecvTransport_(config_
73  , cmdBuffer_ //to store cmds
74  , subscriptions_
75  , ep2SessionDict_)
76  , loopback_(config_.getExt<bool>("loopback")) {
77  subscriptions_.addAll<std::tuple<app::StartMemorySegTrain, app::MemorySeg>>();
78  if (!config_.getExt<bool>("allowRecvWithinProcess")) {
79  myPid_ = getpid();
80  }
81  }
82 
83 
84 /**
85  * @brief start the show by schedule the mesage recv
86  */
87  void start() {
88  mcRecvTransport_.start();
89  }
90 
91  void runOnce() HMBDC_RESTRICT {
92  utils::EpollTask::instance().poll();
94  auto n = cmdBuffer_.peek(begin, end);
95  auto it = begin;
96  while (it != end) {
97  MD()(*this, *static_cast<MessageHead*>(*it++));
98  }
99  cmdBuffer_.wasteAfterPeek(begin, n);
100  mcRecvTransport_.runOnce();
101  for (auto it = recvSessions_.begin(); it != recvSessions_.end();) {
102  if (hmbdc_unlikely(!it->second->runOnce())) {
103  it->second->stop();
104  cancel(*it->second);
105  ep2SessionDict_.erase(it->second->sendFrom);
106  recvSessions_.erase(it++);
107  } else {
108  it++;
109  }
110  }
111  }
112 
113 /**
114  * @brief only used by MH
115  */
117  auto ip = inet_addr(t.ip);
118  //unless using loopback address, don't EVER listen to myself (own process)
119  if (ip == myBackupIp_
120  && (!loopback_)) {
121  return;
122  }
123  if (t.srcPid == myPid_ && ip == myBackupIp_) return;
124  auto key = make_pair(ip, t.port);
125  if (recvSessions_.find(key) == recvSessions_.end()) {
126  for (auto i = 0u; i < t.typeTagCountContained; ++i) {
127  if (subscriptions_.check(t.typeTags[i])) {
128  try {
129  typename Session::ptr sess {
130  new Session(
131  config_
132  , t.sendFrom
133  , subscriptions_
134  , outputBuffer_
135  , t.recvReportDelay
136  )
137  };
138  sess->start(ip, t.port);
139  recvSessions_[key] = sess;
140  ep2SessionDict_[t.sendFrom] = sess;
141  schedule(SysTime::now(), *sess);
142  } catch (std::exception const& e) {
143  HMBDC_LOG_C(e.what());
144  }
145  break;
146  }
147  }
148  } //else ignore
149  }
150 
151  /**
152  * @brief check how many other parties are sending to this engine
153  * @return recipient session count are still active
154  */
155  size_t sessionsRemainingActive() const {
156  return recvSessions_.size();
157  }
158 
159  template <MessageTupleC Messages, typename CcNode>
160  void subscribeFor(CcNode const& node, uint16_t mod, uint16_t res) {
161  subscriptions_.addSubsFor<Messages>(node, mod, res);
162  }
163 private:
165 
167  TypeTagSet subscriptions_;
168  OutputBuffer outputBuffer_;
169  in_addr_t myBackupIp_;
172  using Ep2SessionDict //sender's ip address port ->
173  = boost::unordered_map<sockaddr_in, typename Session::ptr
174  , endpointhash, endpointequal>;
175  Ep2SessionDict ep2SessionDict_;
176  boost::unordered_map<pair<uint64_t, uint16_t>
177  , typename Session::ptr> recvSessions_;
179  bool loopback_;
180  uint32_t myPid_ = 0;
181 };
182 
183 template <typename OutputBuffer, typename AttachmentAllocator>
185 : RecvTransportImpl<OutputBuffer, AttachmentAllocator>
186 , Client<RecvTransportEngine<OutputBuffer, AttachmentAllocator>> {
187 private:
189 
190 public:
191  RecvTransportEngine(Config const& cfg
192  , OutputBuffer& outputBuffer)
193  : Impl(cfg, outputBuffer) {
194  Impl::start();
195  }
196 
197  void rotate() {
198  if (runLock_.try_lock()) {
199  this->checkTimers(time::SysTime::now());
201  runLock_.unlock();
202  }
203  }
204 
205  /*virtual*/
206  void messageDispatchingStartedCb(size_t const*) override {
207  runLock_.lock();
208  };
209 
210  /*virtual*/
211  void invokedCb(size_t) HMBDC_RESTRICT override {
212  Impl::runOnce();
213  }
214 
215 /**
216  * @brief should not happen ever unless an exception thrown
217  *
218  * @param e exception thown
219  */
220  /*virtual*/
221  void stoppedCb(std::exception const& e) override {
222  HMBDC_LOG_C(e.what());
223  };
224 
225  using EngineTransport::hmbdcName;
226 
227  std::tuple<char const*, int> schedSpec() const {
228  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
229  }
230 
232  this->runLock_.unlock();
233  }
234 private:
235  std::mutex runLock_;
236 };
237 
238 } //recvtransportengine_detail
239 
240 template <typename OutputBuffer, typename AttachmentAllocator>
241 using RecvTransportEngine = recvtransportengine_detail::RecvTransportEngine<OutputBuffer
242  , AttachmentAllocator>;
243 }}}
Definition: MonoLockFreeBuffer.hpp:16
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
void setAdditionalFallbackConfig(Config const &c)
set additional defaults
Definition: Config.hpp:154
Definition: Messages.hpp:90
impl class
Definition: McRecvTransport.hpp:29
Definition: TypedString.hpp:84
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer)
ctor
Definition: RecvTransportEngine.hpp:63
impl class
Definition: RecvTransportEngine.hpp:49
Definition: Timers.hpp:70
Config & resetSection(char const *section, bool sectionExists=true)
change section name
Definition: Config.hpp:177
Definition: Hash.hpp:32
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:221
Definition: MessageDispacher.hpp:184
Definition: Message.hpp:212
Definition: Hash.hpp:20
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:25
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: RecvTransportEngine.hpp:211
Definition: TypeTagSet.hpp:141
Definition: Message.hpp:263
size_t sessionsRemainingActive() const
check how many other parties are sending to this engine
Definition: RecvTransportEngine.hpp:155
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
void handleMessageCb(TypeTagBackupSource const &t)
only used by MH
Definition: RecvTransportEngine.hpp:116
Definition: Base.hpp:12
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:87
Definition: LockFreeBufferMisc.hpp:89
void messageDispatchingStartedCb(size_t const *) override
called before any messages got dispatched - only once
Definition: RecvTransportEngine.hpp:206