hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/rnetmap/Transport.hpp"
4 #include "hmbdc/tips/rnetmap/NmRecvTransport.hpp"
5 #include "hmbdc/tips/rnetmap/BackupRecvSession.hpp"
6 #include "hmbdc/tips/rnetmap/DefaultUserConfig.hpp"
7 #include "hmbdc/tips/reliable/AttBufferAdaptor.hpp"
8 #include "hmbdc/app/Logger.hpp"
9 #include "hmbdc/comm/inet/Hash.hpp"
10 
11 #include <boost/bind.hpp>
12 #include <boost/lexical_cast.hpp>
13 #include <boost/unordered_map.hpp>
14 
15 #include <memory>
16 #include <regex>
17 #include <type_traits>
18 #include <mutex>
19 
20 namespace hmbdc { namespace tips { namespace rnetmap {
24  , cfg.resetSection("rx", false)))
25  {}
26  virtual ~RecvTransport(){}
27 
28 };
29 
30 namespace recvtransportengine_detail {
31 
32 using namespace std;
33 using namespace hmbdc::time;
34 using namespace hmbdc::app;
35 
36 template <typename OutputBuffer, typename AttachmentAllocator>
39 , TimerManager {
41  friend struct NetContext; //only be created by NetContext
42  RecvTransportImpl(Config const& cfgIn
43  , OutputBuffer& outputBuffer)
44  : RecvTransport(cfgIn)
45  , cmdBuffer_(std::max({sizeof(MessageWrap<TypeTagBackupSource>)})
46  , config_.getExt<uint16_t>("cmdBufferSizePower2"))
47  , outputBuffer_(outputBuffer)
48  , myBackupIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
49  config_.getExt<string>("tcpIfaceAddr") == string("ifaceAddr")
50  ?config_.getExt<string>("ifaceAddr"):config_.getExt<string>("tcpIfaceAddr")
51  ).c_str()))
52  , nmRecvTransport_(config_
53  , cmdBuffer_ //to store cmds
54  , subscriptions_
55  , ep2SessionDict_)
56  , loopback_(config_.getExt<bool>("loopback")) {
57  subscriptions_.addAll<std::tuple<app::StartMemorySegTrain, app::MemorySeg>>();
58  if (!config_.getExt<bool>("allowRecvWithinProcess")) {
59  myPid_ = getpid();
60  }
61  }
62 
63  void runOnce() HMBDC_RESTRICT {
64  utils::EpollTask::instance().poll();
66  auto n = cmdBuffer_.peek(begin, end);
67  auto it = begin;
68  while (it != end) {
69  MD()(*this, *static_cast<MessageHead*>(*it++));
70  }
71  cmdBuffer_.wasteAfterPeek(begin, n);
72  nmRecvTransport_.runOnce();
73  for (auto it = recvSessions_.begin(); it != recvSessions_.end();) {
74  if (hmbdc_unlikely(!it->second->runOnce())) {
75  it->second->stop();
76  cancel(*it->second);
77  ep2SessionDict_.erase(it->second->sendFrom);
78  recvSessions_.erase(it++);
79  } else {
80  it++;
81  }
82  }
83  }
84 
85 /**
86  * @brief only used by MD
87  */
89  auto ip = inet_addr(t.ip);
90  //unless using loopback address, don't EVER listen to myself (own process)
91  if (ip == myBackupIp_
92  && (!loopback_)) {
93  return;
94  }
95  if (t.srcPid == myPid_ && ip == myBackupIp_) return;
96  auto key = make_pair(ip, t.port);
97  if (recvSessions_.find(key) == recvSessions_.end()) {
98  for (auto i = 0u; i < t.typeTagCountContained; ++i) {
99  if (subscriptions_.check(t.typeTags[i])) {
100  try {
101  typename Session::ptr sess {
102  new Session(
103  config_
104  , t.sendFrom
105  , subscriptions_
106  , outputBuffer_
107  , t.recvReportDelay
108  )
109  };
110  sess->start(ip, t.port);
111  recvSessions_[key] = sess;
112  ep2SessionDict_[t.sendFrom] = sess;
113  schedule(SysTime::now(), *sess);
114  } catch (std::exception const& e) {
115  HMBDC_LOG_C(e.what());
116  }
117  break;
118  }
119  }
120  } //else ignore
121  }
122 
123  /**
124  * @brief check how many other parties are sending to this engine
125  * @return recipient session count are still active
126  */
127  size_t sessionsRemainingActive() const {
128  return recvSessions_.size();
129  }
130 
131  template <MessageTupleC Messages, typename CcNode>
132  void subscribeFor(CcNode const& node, uint16_t mod, uint16_t res) {
133  subscriptions_.addSubsFor<Messages>(node, mod, res);
134  }
135 
136 private:
138 
139  pattern::MonoLockFreeBuffer cmdBuffer_;
140  OutputBuffer& outputBuffer_;
141  TypeTagSet subscriptions_;
142  uint64_t myBackupIp_;
143  using Ep2SessionDict //sender's ip address port ->
144  = boost::unordered_map<uint32_t, typename Session::ptr>;
145  Ep2SessionDict ep2SessionDict_;
146  boost::unordered_map<pair<uint64_t, uint16_t>
147  , typename Session::ptr> recvSessions_;
149  bool loopback_;
150  uint32_t myPid_ = 0;
151 };
152 
153 template <typename OutputBuffer, typename AttachmentAllocator>
155 : RecvTransportImpl<OutputBuffer, AttachmentAllocator>
156 , Client<RecvTransportEngine<OutputBuffer, AttachmentAllocator>> {
158  using Impl::Impl;
159 
160  void rotate() {
161  this->checkTimers(time::SysTime::now());
163  }
164 
165  /*virtual*/
166  void invokedCb(size_t) HMBDC_RESTRICT override {
167  Impl::runOnce();
168  }
169 
170  /*virtual*/
171  void stoppedCb(std::exception const& e) override {
172  HMBDC_LOG_C(e.what());
173  };
174 
175  using EngineTransport::hmbdcName;
176 
177  std::tuple<char const*, int> schedSpec() const {
178  return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
179  }
180 };
181 } //recvtransportengine_detail
182 
183 template <typename OutputBuffer, typename AttachmentAllocator>
184 using RecvTransportEngine = recvtransportengine_detail::RecvTransportEngine<
185  OutputBuffer, AttachmentAllocator>;
186 }}}
Definition: MonoLockFreeBuffer.hpp:16
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
void setAdditionalFallbackConfig(Config const &c)
set additional defaults
Definition: Config.hpp:154
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
Config & resetSection(char const *section, bool sectionExists=true)
change section name
Definition: Config.hpp:177
impl class
Definition: NmRecvTransport.hpp:31
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: RecvTransportEngine.hpp:171
void handleMessageCb(TypeTagBackupSource const &t)
only used by MD
Definition: RecvTransportEngine.hpp:88
Definition: MessageDispacher.hpp:184
Definition: Message.hpp:212
Definition: Messages.hpp:116
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: RecvTransportEngine.hpp:166
Definition: TypeTagSet.hpp:141
Definition: Message.hpp:263
Definition: RecvTransportEngine.hpp:21
Definition: Rater.hpp:10
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
size_t sessionsRemainingActive() const
check how many other parties are sending to this engine
Definition: RecvTransportEngine.hpp:127
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89