hmbdc
simplify-high-performance-messaging-programming
RecvSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/os/DownloadFile.hpp"
8 #include "hmbdc/os/DownloadMemory.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
10 
11 #include "hmbdc/app/tcpcast/RecvSession.hpp"
12 
13 #include <boost/lexical_cast.hpp>
14 
15 #include <functional>
16 #include <memory>
17 #include <utility>
18 #include <regex>
19 
20 #include <iostream>
21 
22 namespace hmbdc { namespace app { namespace tcpcast {
23 
24 namespace recvsession_detail {
25 using namespace std;
26 using namespace hmbdc::text;
27 
28 template <typename OutputBuffer, typename MsgArbitrator>
29 struct RecvSession {
30  using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
31  using CleanupFunc = std::function<void()>;
32  RecvSession(Config const& config
33  , StringTrieSet const& subscriptions
34  , std::regex const& topicRegex
35  , OutputBuffer& outputBuffer
36  , uint64_t connKey
37  , MsgArbitrator& arb)
38  : config_(config)
39  , subscriptions_(subscriptions)
40  , topicRegex_(topicRegex)
41  , writeFd_(config)
42  , arb_(arb)
43  , outputBuffer_(outputBuffer)
44  , connKey_(connKey)
45  , initialized_(false)
46  , stopped_(false)
47  , bufSize_(config.getExt<size_t>("maxTcpReadBytes"))
48  , buf_((char*)memalign(SMP_CACHE_BYTES, bufSize_))
49  , bufCur_(buf_)
50  , filledLen_(0)
51  , currTransportHeadFlag_(0) {
52  config(allowRecvMemoryAttachment_, "allowRecvMemoryAttachment");
53  }
54 
55  ~RecvSession() {
56  free(buf_);
57  HMBDC_LOG_N("RecvSession retired: ", id());
58  }
59 
60  void start(in_addr_t ip, uint16_t port) {
61  sockaddr_in remoteAddr = {0};
62  remoteAddr.sin_family = AF_INET;
63  remoteAddr.sin_addr.s_addr = ip;
64  remoteAddr.sin_port = htons(port);
65  if (connect(writeFd_.fd, (sockaddr*)&remoteAddr, sizeof(remoteAddr)) < 0) {
66  if (errno != EINPROGRESS) {
67  HMBDC_THROW(runtime_error, "connect fail, errno=" << errno);
68  }
69  }
70 
71  auto forRead = dup(writeFd_.fd);
72  if (forRead == -1) {
73  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
74  }
75 
76  readFd_.fd = forRead;
77  utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
78  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
79  }
80 
81  void stop() {
82  if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
83  new (*itPending_) MessageWrap<Flush>;
84  outputBuffer_.commit(itPending_);
85  currTransportHeadFlag_ = 0;
86  }
87  outputBuffer_.putSome(sessionDropped_);
88  }
89 
90  void sendSubscribe(Topic const& t) {
91  if (hmbdc_unlikely(std::regex_match(t.c_str(), topicRegex_))) {
92  char buf[70];
93  buf[0]='+';
94  auto l = int(t.copyTo(buf + 1));
95  buf[l + 1] = '\t';
96  if (hmbdc_unlikely(send(writeFd_.fd, buf, l + 2, MSG_NOSIGNAL) != l + 2)) {
97  HMBDC_LOG_C("error when sending subscribe to ", id());
98  stopped_ = true;
99  }
100  }
101  }
102 
103 
104  void sendUnsubscribe(Topic const& t) {
105  if (hmbdc_unlikely(std::regex_match(t.c_str(), topicRegex_))) {
106  char buf[70];
107  buf[0]='-';
108  auto l = int(t.copyTo(buf + 1));
109  buf[l + 1] = '\t';
110  if (hmbdc_unlikely(l + 2 != send(writeFd_.fd, buf, l + 2, MSG_NOSIGNAL))) {
111  HMBDC_LOG_C("error when sending unsubscribe to ", id(), " errno=", errno);
112  stopped_ = true;
113  }
114  }
115  }
116 
117  void heartbeat() {
118  if (!initialized_) return;
119  char const* hb = "+\t";
120  if (hmbdc_unlikely(2 != send(writeFd_.fd, hb, 2, MSG_NOSIGNAL))){
121  HMBDC_LOG_C("error when sending heartbeat to ", id(), " errno=", errno);
122  stopped_ = true;
123  }
124  }
125 
126  char const* id() const {
127  return id_.c_str();
128  }
129 
130  bool runOnce() {
131  if (hmbdc_unlikely(stopped_)) return false;
132  if (hmbdc_unlikely(!initialized_ && writeFd_.isFdReady())) {
133  try {
134  initializeConn();
135  } catch (std::exception const& e) {
136  HMBDC_LOG_W(e.what());
137  return false;
138  } catch (...) {
139  HMBDC_LOG_C("unknown exception");
140  return false;
141  }
142  }
143  return doRead();
144  }
145 
146 private:
147  void initializeConn() {
148  int flags = fcntl(writeFd_.fd, F_GETFL, 0);
149  flags &= ~O_NONBLOCK;
150  if (fcntl(writeFd_.fd, F_SETFL, flags) < 0) {
151  HMBDC_THROW(std::runtime_error, "fcntl failed errno=" << errno);
152  }
153 
154  auto sz = config_.getExt<int>("tcpRecvBufferBytes");
155  if (sz) {
156  if (setsockopt(readFd_.fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
157  HMBDC_LOG_C("failed to set send buffer size=", sz);
158  }
159  }
160 
161  auto addrPort = hmbdc::comm::inet::getPeerIpPort(writeFd_.fd);
162  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
163  strncpy(sessionStarted_.payload.ip, id_.c_str()
164  , sizeof(sessionStarted_.payload.ip));
165  sessionStarted_.payload.ip[sizeof(sessionStarted_.payload.ip) - 1] = 0;
166  strncpy(sessionDropped_.payload.ip, id_.c_str()
167  , sizeof(sessionDropped_.payload.ip));
168  sessionDropped_.payload.ip[sizeof(sessionDropped_.payload.ip) - 1] = 0;
169  outputBuffer_.putSome(sessionStarted_);
170  HMBDC_LOG_N("RecvSession started: ", id());
171  sendSubscriptions();
172  initialized_ = true;
173  }
174 
175  void sendSubscriptions() {
176  ostringstream oss;
177  oss << '@' << connKey_ << '\t';
178  for (auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
179  auto const& t = it->first;
180  if (std::regex_match(t.c_str(), topicRegex_)) {
181  oss << '+' << t;
182  if (it->second) {
183  oss << '*';
184  }
185  oss << '\t';
186  }
187  }
188  auto s = oss.str() + "+\t"; //mark all sent with "+\t"
189  auto sz = size_t(send(writeFd_.fd, s.c_str(), s.size(), MSG_NOSIGNAL));
190  if (hmbdc_unlikely(sz != s.size())) {
191  HMBDC_LOG_C("error when sending subscriptions to ", id());
192  stopped_ = true;
193  }
194  }
195 
196  bool doRead() {
197  do {
198  if (hmbdc_unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag
199  && filledLen_)) {
200  auto s = memoryAttachment_.write(bufCur_, filledLen_);
201  if (memoryAttachment_.writeDone()) {
202  memoryAttachment_.close();
203  currTransportHeadFlag_ = 0;
204  outputBuffer_.commit(itPending_);
205  }
206  bufCur_ += s;
207  filledLen_ -= s;
208  }
209  while (currTransportHeadFlag_ == 0
210  && filledLen_ >= sizeof(TransportMessageHeader)) {
211  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
212  auto wireSize = h->wireSize();
213  if (hmbdc_likely(filledLen_ >= wireSize)) {
214  if (hmbdc_likely(subscriptions_.check(h->topic()))) {
215  auto a = arb_(h);
216  if (hmbdc_likely(a > 0)) {
217  if (hmbdc_unlikely(h->flag == hasMemoryAttachment::flag
218  && allowRecvMemoryAttachment_.find(h->typeTag()) != allowRecvMemoryAttachment_.end())) {
219  currTransportHeadFlag_ = h->flag;
220  itPending_ = outputBuffer_.claim();
221  char* b = static_cast<char*>(*itPending_);
222  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
223  memcpy(b, h->payload(), l);
224  auto& att =
225  static_cast<app::MessageWrap<hasMemoryAttachment>*>(*itPending_)->payload;
226  att.hasMemoryAttachment::attachment = memoryAttachment_.open(att.len);
227  att.hasMemoryAttachment::afterConsumedCleanupFunc = hasMemoryAttachment::free;
228  } else if (h->flag != hasMemoryAttachment::flag) {
229  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
230  outputBuffer_.put(h->payload(), l);
231  currTransportHeadFlag_ = 0;
232  } //else ignore
233  } else if (hmbdc_unlikely(a == 0)) {
234  //cannot decide now - keep bufCur_ filledLen_ unchanged
235  //try it later, use a timer since there might be no more bytes arriving
236  return true;
237  } //else move to next msg
238  }
239  bufCur_ += wireSize;
240  //memmove(buf_, buf_ + h->wireSize(), filledLen_ - h->wireSize());
241  filledLen_ -= wireSize;
242  } else {
243  break;
244  }
245  }
246  memmove(buf_, bufCur_, filledLen_);
247  bufCur_ = buf_;
248 
249  if (readFd_.isFdReady()) {
250  auto l = recv(readFd_.fd, buf_ + filledLen_, bufSize_ - filledLen_
251  , MSG_NOSIGNAL|MSG_DONTWAIT);
252  if (hmbdc_unlikely(l < 0)) {
253  if (hmbdc_unlikely(!readFd_.checkErr())) {
254  HMBDC_LOG_C("recv failed errno=", errno);
255  return false;
256  }
257  } else if (hmbdc_unlikely(l == 0)) {
258  HMBDC_LOG_W("peer dropped:", id());
259  return false;
260  } else {
261  filledLen_ += l;
262  }
263  }
264  } while (filledLen_);
265  return true;
266  }
267 
268  Config const& config_;
269  StringTrieSet const& subscriptions_;
270  std::regex topicRegex_;
271  utils::EpollFd readFd_;
272  EpollFd writeFd_;
273  MsgArbitrator& arb_;
274  OutputBuffer& outputBuffer_;
275  uint64_t connKey_;
276  string id_;
277  MessageWrap<SessionStarted> sessionStarted_;
278  MessageWrap<SessionDropped> sessionDropped_;
279  bool initialized_;
280  bool stopped_;
281  size_t const bufSize_;
282  char* buf_;
283  char* bufCur_;
284  size_t filledLen_;
285  uint8_t currTransportHeadFlag_;
286  typename OutputBuffer::iterator itPending_;
287 
288  std::unordered_set<uint16_t> allowRecvMemoryAttachment_;
289  os::DownloadMemory memoryAttachment_;
290 };
291 } //recvsession_detail
292 template <typename OutputBuffer, typename MsgArbitrator>
294 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:225
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:76
Definition: EpollTask.hpp:70
Definition: DownloadMemory.hpp:10
Definition: Transport.hpp:24
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:112
Definition: Base.hpp:13
Definition: LfbStream.hpp:11