hmbdc
simplify-high-performance-messaging-programming
RecvSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/tips/tcpcast/Transport.hpp"
5 #include "hmbdc/tips/tcpcast/Messages.hpp"
6 #include "hmbdc/tips/TypeTagSet.hpp"
7 #include "hmbdc/comm/inet/Misc.hpp"
8 
9 
10 #include <boost/lexical_cast.hpp>
11 
12 #include <functional>
13 #include <memory>
14 #include <utility>
15 #include <regex>
16 
17 #include <iostream>
18 
19 namespace hmbdc { namespace tips { namespace tcpcast {
20 
21 namespace recvsession_detail {
22 
23 using namespace std;
24 using namespace hmbdc::app;
25 
26 template <typename OutputBuffer, typename AttachmentAllocator>
27 struct RecvSession {
28  using ptr = shared_ptr<RecvSession<OutputBuffer, AttachmentAllocator>>;
29  using CleanupFunc = std::function<void()>;
30  RecvSession(Config const& config
31  , TypeTagSet const& subscriptions
32  , OutputBuffer& outputBuffer)
33  : config_(config)
34  , subscriptions_(subscriptions)
35  , writeFd_(config)
36  , outputBuffer_(outputBuffer)
37  , initialized_(false)
38  , stopped_(false)
39  , bufSize_(config.getExt<size_t>("maxTcpReadBytes"))
40  , buf_((char*)memalign(SMP_CACHE_BYTES, bufSize_))
41  , bufCur_(buf_)
42  , filledLen_(0)
43  , currTransportHeadFlag_(0)
44  , hasMemoryAttachmentItemSpace_(
45  new char[std::max(outputBuffer_.maxItemSize()
46  , sizeof(hasMemoryAttachment) + sizeof(MessageHead))] {0})
47  , hasMemoryAttachment_(&((MessageHead*)(hasMemoryAttachmentItemSpace_.get()))
48  ->get<hasMemoryAttachment>()){
49  }
50 
51  ~RecvSession() {
52  free(buf_);
53  hasMemoryAttachment_->release();
54  HMBDC_LOG_N("RecvSession retired: ", id());
55  }
56 
57  void start(in_addr_t ip, uint16_t port) {
58  sockaddr_in remoteAddr = {0};
59  remoteAddr.sin_family = AF_INET;
60  remoteAddr.sin_addr.s_addr = ip;
61  remoteAddr.sin_port = htons(port);
62  if (connect(writeFd_.fd, (sockaddr*)&remoteAddr, sizeof(remoteAddr)) < 0) {
63  if (errno != EINPROGRESS) {
64  HMBDC_THROW(runtime_error, "connect fail, errno=" << errno);
65  }
66  }
67 
68  auto forRead = dup(writeFd_.fd);
69  if (forRead == -1) {
70  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
71  }
72 
73  readFd_.fd = forRead;
74  utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
75  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
76  }
77 
78  void stop() {
79  if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
80  currTransportHeadFlag_ = 0;
81  }
82  outputBuffer_.putSome(sessionDropped_);
83  }
84 
85  void heartbeat() {
86  if (!initialized_) return;
87  char const* hb = "+\t";
88  if (hmbdc_unlikely(2 != send(writeFd_.fd, hb, 2, MSG_NOSIGNAL))){
89  HMBDC_LOG_C("error when sending heartbeat to ", id(), " errno=", errno);
90  stopped_ = true;
91  }
92  }
93 
94  char const* id() const {
95  return id_.c_str();
96  }
97 
98  bool runOnce() {
99  if (hmbdc_unlikely(stopped_)) return false;
100  if (hmbdc_unlikely(!initialized_ && writeFd_.isFdReady())) {
101  try {
102  initializeConn();
103  } catch (std::exception const& e) {
104  HMBDC_LOG_W(e.what());
105  return false;
106  } catch (...) {
107  HMBDC_LOG_C("unknown exception");
108  return false;
109  }
110  }
111  return doRead();
112  }
113 
114 private:
115  void initializeConn() {
116  int flags = fcntl(writeFd_.fd, F_GETFL, 0);
117  flags &= ~O_NONBLOCK;
118  if (fcntl(writeFd_.fd, F_SETFL, flags) < 0) {
119  HMBDC_THROW(std::runtime_error, "fcntl failed errno=" << errno);
120  }
121 
122  auto sz = config_.getExt<int>("tcpRecvBufferBytes");
123  if (sz) {
124  if (setsockopt(readFd_.fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
125  HMBDC_LOG_C("failed to set send buffer size=", sz);
126  }
127  }
128 
129  auto addrPort = hmbdc::comm::inet::getPeerIpPort(writeFd_.fd);
130  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
131  strncpy(sessionStarted_.payload.ip, id_.c_str()
132  , sizeof(sessionStarted_.payload.ip));
133  sessionStarted_.payload.ip[sizeof(sessionStarted_.payload.ip) - 1] = 0;
134  strncpy(sessionDropped_.payload.ip, id_.c_str()
135  , sizeof(sessionDropped_.payload.ip));
136  sessionDropped_.payload.ip[sizeof(sessionDropped_.payload.ip) - 1] = 0;
137  outputBuffer_.putSome(sessionStarted_);
138  HMBDC_LOG_N("RecvSession started: ", id());
139  sendSubscriptions();
140  initialized_ = true;
141  }
142 
143  void sendSubscriptions() {
144  ostringstream oss;
145  subscriptions_.exportTo([&oss](uint16_t tag, auto&& count) {
146  if (count) {
147  oss << '+' << tag << '\t';
148  }
149  });
150  auto s = oss.str() + "+\t"; //mark all sent with "+\t"
151  auto sz = size_t(send(writeFd_.fd, s.c_str(), s.size(), MSG_NOSIGNAL));
152  if (hmbdc_unlikely(sz != s.size())) {
153  HMBDC_LOG_C("error when sending subscriptions to ", id());
154  stopped_ = true;
155  }
156  }
157 
158  bool doRead() {
159  do {
160  if (hmbdc_unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag
161  && filledLen_)) {
162  auto s = memoryAttachment_.write(bufCur_, filledLen_);
163  if (memoryAttachment_.writeDone()) {
164  memoryAttachment_.close();
165  currTransportHeadFlag_ = 0;
166  outputBuffer_.putAtt((MessageWrap<hasMemoryAttachment>*)
167  hasMemoryAttachmentItemSpace_.get(), outputBuffer_.maxItemSize());
168  }
169  bufCur_ += s;
170  filledLen_ -= s;
171  }
172  while (currTransportHeadFlag_ == 0
173  && filledLen_ >= sizeof(TransportMessageHeader)) {
174  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
175  auto wireSize = h->wireSize();
176  if (hmbdc_likely(filledLen_ >= wireSize)) {
177  if (hmbdc_likely(subscriptions_.check(h->typeTag()))) {
178  if (hmbdc_unlikely(h->flag == hasMemoryAttachment::flag)) {
179  currTransportHeadFlag_ = h->flag;
180  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
181  memcpy(hasMemoryAttachmentItemSpace_.get(), h->payload(), l);
182  memoryAttachment_.open(h->typeTag(), hasMemoryAttachment_);
183  // hasMemoryAttachment_->attachment = memoryAttachment_.open(hasMemoryAttachment_->len);
184  // hasMemoryAttachment_->afterConsumedCleanupFunc
185  // = [](hasMemoryAttachment* a) {::free(a->attachment);a->attachment = nullptr;};
186  } else {
187  auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
188  outputBuffer_.put(h->payload(), l);
189  currTransportHeadFlag_ = 0;
190  }
191  }
192  bufCur_ += wireSize;
193  //memmove(buf_, buf_ + h->wireSize(), filledLen_ - h->wireSize());
194  filledLen_ -= wireSize;
195  } else {
196  break;
197  }
198  }
199  memmove(buf_, bufCur_, filledLen_);
200  bufCur_ = buf_;
201 
202  if (readFd_.isFdReady()) {
203  auto l = recv(readFd_.fd, buf_ + filledLen_, bufSize_ - filledLen_
204  , MSG_NOSIGNAL|MSG_DONTWAIT);
205  if (hmbdc_unlikely(l < 0)) {
206  if (hmbdc_unlikely(!readFd_.checkErr())) {
207  HMBDC_LOG_C("recv failed errno=", errno);
208  return false;
209  }
210  } else if (hmbdc_unlikely(l == 0)) {
211  HMBDC_LOG_W("peer dropped:", id());
212  return false;
213  } else {
214  filledLen_ += l;
215  }
216  } else {
217  return true; //has done what can be done
218  }
219  } while (filledLen_);
220  return true;
221  }
222 
223  Config const& config_;
224  TypeTagSet const& subscriptions_;
225  utils::EpollFd readFd_;
226  EpollFd writeFd_;
227  OutputBuffer& outputBuffer_;
228  string id_;
229  MessageWrap<SessionStarted> sessionStarted_;
230  MessageWrap<SessionDropped> sessionDropped_;
231  bool initialized_;
232  bool stopped_;
233  size_t const bufSize_;
234  char* buf_;
235  char* bufCur_;
236  size_t filledLen_;
237  uint8_t currTransportHeadFlag_;
238  std::unique_ptr<char[]> hasMemoryAttachmentItemSpace_;
239  hasMemoryAttachment* hasMemoryAttachment_;
240 
241  struct DownloadMemory {
243  : addr_(nullptr)
244  , fullLen_(0)
245  , len_(0) {
246  }
247 
248  void* open(uint16_t typeTag, app::hasMemoryAttachment* att) {
249  addr_ = (char*)alloc_(typeTag, att);
250  if (addr_) {
251  fullLen_ = att->len;
252  len_ = 0;
253  }
254  return addr_;
255  }
256 
257  bool writeDone() const {
258  return fullLen_ == len_;
259  }
260 
261  size_t write(void const* mem, size_t l) {
262  auto wl = std::min(l, fullLen_ - len_);
263  if (addr_) {
264  memcpy(addr_ + len_, mem, wl);
265  }
266  len_ += wl;
267  return wl;
268  }
269 
270  void close() {
271  addr_ = nullptr;
272  fullLen_ = 0;
273  len_ = 0;
274  }
275 
276  private:
277  AttachmentAllocator alloc_;
278  char* addr_;
279  size_t fullLen_;
280  size_t len_;
281  };
282  DownloadMemory memoryAttachment_;
283 };
284 } //recvsession_detail
285 template <typename OutputBuffer, typename AttachmentAllocator>
287 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: TypedString.hpp:84
Definition: EpollTask.hpp:87
Definition: Message.hpp:212
Definition: Transport.hpp:20
Definition: TypeTagSet.hpp:141
Definition: Message.hpp:263
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Base.hpp:12