hmbdc
simplify-high-performance-messaging-programming
BackupRecvSessionT.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/reliable/TcpEpollFd.hpp"
4 #include "hmbdc/tips/reliable/AttBufferAdaptor.hpp"
5 #include "hmbdc/app/Message.hpp"
6 #include "hmbdc/app/Config.hpp"
7 #include "hmbdc/app/Logger.hpp"
8 #include "hmbdc/pattern/SeqArb.hpp"
9 #include "hmbdc/time/Time.hpp"
10 #include "hmbdc/comm/inet/Misc.hpp"
11 
12 #include <unordered_set>
13 #include <boost/lexical_cast.hpp>
14 
15 #include <functional>
16 #include <memory>
17 #include <utility>
18 #include <regex>
19 
20 #include <iostream>
21 
22 namespace hmbdc { namespace tips { namespace reliable {
23 
24 using Tags = std::unordered_set<uint16_t>;
25 
26 namespace backuprecvsessiont_detail {
27 using namespace std;
28 using namespace hmbdc::app;
29 using namespace hmbdc::time;
30 
31 template <typename TransportMessageHeader
32  , typename SessionStarted
33  , typename SessionDropped
34  , typename SeqAlert
35  , typename OutputBufferIn
36  , typename SendFrom
37  , typename AttachmentAllocator>
40  using ptr = shared_ptr<BackupRecvSessionT<TransportMessageHeader
41  , SessionStarted
42  , SessionDropped
43  , SeqAlert
44  , OutputBufferIn
45  , SendFrom
46  , AttachmentAllocator>>;
47  BackupRecvSessionT(Config const& config
48  , SendFrom const& sendFrom
49  , TypeTagSet const& subscriptions
50  , OutputBufferIn& outputBuffer
51  , time::Duration recvReportDelay)
52  : ReoccuringTimer(recvReportDelay)
53  , sendFrom(sendFrom)
54  , config_(config)
55  , minSeq_(numeric_limits<HMBDC_SEQ_TYPE>::max())
56  , subscriptions_(subscriptions)
57  , writeFd_(config_)
58  , outputBuffer_(outputBuffer)
59  , bufSize_(config.getExt<size_t>("maxTcpReadBytes"))
60  , buf_((char*)memalign(SMP_CACHE_BYTES, bufSize_))
61  , bufCur_(buf_)
62  , filledLen_(0)
63  , seqArb_(numeric_limits<HMBDC_SEQ_TYPE>::max())
64  , initialized_(false)
65  , ready_(false)
66  , stopped_(false)
67  , recvBackupMessageCount_(0)
68  , gapPending_(false)
69  , gapPendingSeq_(0) {
70  setCallback(
71  [this](TimerManager& tm, SysTime const& now) {
72  sendGapReport(seqArb_.expectingSeq(), 0);
73  }
74  );
75  }
76 
77  virtual ~BackupRecvSessionT() {
78  ::free(buf_);
79  HMBDC_LOG_N("BackupRecvSession retired: ", id(), " lifetime recved:", recvBackupMessageCount_);
80  }
81 
82  void stop() {
83  outputBuffer_.putSome(sessionDropped_);
84  }
85 
86  void start(in_addr_t ip, uint16_t port) {
87  sockaddr_in remoteAddr = {0};
88  remoteAddr.sin_family = AF_INET;
89  remoteAddr.sin_addr.s_addr = ip;
90  remoteAddr.sin_port = htons(port);
91  if (connect(writeFd_.fd, (sockaddr*)&remoteAddr, sizeof(remoteAddr)) < 0) {
92  if (errno != EINPROGRESS) {
93  HMBDC_THROW(runtime_error, "connect fail, errno=" << errno);
94  }
95  }
96 
97  auto forRead = dup(writeFd_.fd);
98  if (forRead == -1) {
99  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
100  }
101 
102  readFd_.fd = forRead;
103  utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
104  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
105  }
106 
107  bool runOnce() {
108  if (hmbdc_unlikely(stopped_)) return false;
109  if (hmbdc_unlikely(!initialized_ && writeFd_.isFdReady())) {
110  try {
111  initializeConn();
112  } catch (std::exception const& e) {
113  HMBDC_LOG_W(e.what());
114  return false;
115  } catch (...) {
116  HMBDC_LOG_C("unknown exception");
117  return false;
118  }
119  }
120  return doRead();
121  }
122 
123  void sendSubscribe(uint16_t t) {
124  char buf[70];
125  auto l = sprintf(buf, "+%d\t", t) - 1;
126  if (hmbdc_unlikely(send(writeFd_.fd, buf, l, MSG_NOSIGNAL) != l)) {
127  HMBDC_LOG_C("error when sending subscribe to ", id());
128  stopped_ = true;
129  }
130  }
131 
132  void sendUnsubscribe(uint16_t const& t) {
133  char buf[70];
134  auto l = sprintf(buf, "-%d\t", t) - 1;
135  if (hmbdc_unlikely(l != send(writeFd_.fd, buf, l, MSG_NOSIGNAL))) {
136  HMBDC_LOG_C("error when sending unsubscribe to ", id(), " errno=", errno);
137  stopped_ = true;
138  }
139  }
140 
141  void sendGapReport(HMBDC_SEQ_TYPE missSeq, size_t len) {
142  if (hmbdc_likely(!gapPending_ && missSeq != numeric_limits<HMBDC_SEQ_TYPE>::max())) {
143  char buf[70];
144  auto l = sprintf(buf, "=%" PRIu64 ",%zu\t", missSeq, len);
145 
146  if (hmbdc_unlikely(l != send(writeFd_.fd, buf, l, MSG_NOSIGNAL))) {
147  HMBDC_LOG_C("error when sending gap report to ", id(), " errno=", errno);
148  stopped_ = true;
149  }
150 
151  if (len) {
152  gapPendingSeq_ = missSeq + len - 1;
153  gapPending_ = true;
154  }
155  }
156  }
157 
158  char const* id() const {
159  return id_.c_str();
160  }
161 
162  int accept(TransportMessageHeader* h) {
163  auto seq = h->getSeq();
164  auto a = arb(0, seq, h);
165  if (a == 1) {
166  auto tag = h->typeTag();
167  if (tag == StartMemorySegTrain::typeTag) {
168  tag = h->template wrapped<StartMemorySegTrain>().inbandUnderlyingTypeTag;
169  }
170  if (subscriptions_.check(tag)) {
171  outputBuffer_.put(h);
172  }
173  }
174  return a;
175  }
176 
177  int arb(uint16_t part, HMBDC_SEQ_TYPE seq
178  , TransportMessageHeader* h) HMBDC_RESTRICT {
179  //UDP packet out of order case
180  if (hmbdc_unlikely(gapPending_ && part == 0)) return 0;
181  if (seq != numeric_limits<HMBDC_SEQ_TYPE>::max()) {
182  auto res = seqArb_(part, seq, [](size_t) {
183  //impossible to get here
184  });
185  if (res == 0) {
186  sendGapReport(seqArb_.expectingSeq(), seq - seqArb_.expectingSeq());
187  } else if (seq == gapPendingSeq_) {
188  gapPending_ = false;
189  }
190  return res;
191  } else if (h->typeTag() == SeqAlert::typeTag) {
192  auto& alert = h->template wrapped<SeqAlert>();
193  auto nextSeq = alert.expectSeq;
194 
195  if (seqArb_.expectingSeq() < nextSeq) {
196  sendGapReport(seqArb_.expectingSeq(), nextSeq - seqArb_.expectingSeq());
197  }
198  // HMBDC_LOG_D(std::this_thread::get_id(), "=-1");
199  return -1;
200  }
201  return 1;
202  }
203 
204  SendFrom const sendFrom;
205 private:
207  void initializeConn() {
208  int flags = fcntl(writeFd_.fd, F_GETFL, 0);
209  flags &= ~O_NONBLOCK;
210  if (fcntl(writeFd_.fd, F_SETFL, flags) < 0) {
211  HMBDC_THROW(std::runtime_error, "fcntl failed errno=" << errno);
212  }
213 
214  auto sz = config_.getExt<int>("tcpRecvBufferBytes");
215  if (sz) {
216  if (setsockopt(readFd_.fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
217  HMBDC_LOG_C("failed to set send buffer size=", sz);
218  }
219  }
220 
221  auto flag = config_.getExt<bool>("nagling")?0:1;
222  if (setsockopt(writeFd_.fd, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)) < 0) {
223  HMBDC_LOG_C("failed to set TCP_NODELAY, errno=", errno);
224  }
225 
226  auto addrPort = hmbdc::comm::inet::getPeerIpPort(writeFd_.fd);
227  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
228  strncpy(sessionStarted_.payload.ip, id_.c_str()
229  , sizeof(sessionStarted_.payload.ip));
230  sessionStarted_.payload.ip[sizeof(sessionStarted_.payload.ip) - 1] = 0;
231  strncpy(sessionDropped_.payload.ip, id_.c_str()
232  , sizeof(sessionDropped_.payload.ip));
233  sessionDropped_.payload.ip[sizeof(sessionDropped_.payload.ip) - 1] = 0;
234  outputBuffer_.putSome(sessionStarted_);
235  sendSubscriptions();
236  initialized_ = true;
237  }
238 
239  void sendSubscriptions() {
240  ostringstream oss;
241  subscriptions_.exportTo([&oss](uint16_t tag, auto&& count) {
242  if (count) {
243  oss << '+' << tag << '\t';
244  }
245  });
246  auto s = oss.str() + "+\t"; //mark all sent with "+\t"
247  auto sz = size_t(send(writeFd_.fd, s.c_str(), s.size(), MSG_NOSIGNAL));
248  if (hmbdc_unlikely(sz != s.size())) {
249  HMBDC_LOG_C("error when sending subscriptions to ", id());
250  stopped_ = true;
251  }
252  }
253 
254  bool
255  doRead() HMBDC_RESTRICT {
256  if (hmbdc_unlikely(seqArb_.expectingSeq() == numeric_limits<HMBDC_SEQ_TYPE>::max()) &&
257  filledLen_ >= sizeof(HMBDC_SEQ_TYPE)) {
258  minSeq_ = *reinterpret_cast<HMBDC_SEQ_TYPE*>(bufCur_);
259  auto wireSize = sizeof(HMBDC_SEQ_TYPE);
260  bufCur_ += wireSize;
261  filledLen_ -= wireSize;
262  HMBDC_LOG_N("BackupRecvSession started ", id(), " with minSeq=", minSeq_);
263  seqArb_.expectingSeq() = minSeq_;
264  }
265  while (minSeq_ != numeric_limits<HMBDC_SEQ_TYPE>::max()
266  && filledLen_ >= sizeof(TransportMessageHeader)) {
267  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
268  auto wireSize = h->wireSize();
269 
270  if (hmbdc_likely(filledLen_ >= wireSize)) {
271  if (hmbdc_likely(h->messagePayloadLen //0 for flushing - no typeTag
272  && subscriptions_.check(h->typeTag()))) {
273  outputBuffer_.put(h);
274  }
275  bufCur_ += wireSize;
276  //memmove(buf_, buf_ + h->wireSize(), filledLen_ - h->wireSize());
277  filledLen_ -= wireSize;
278 
279  arb(1, seqArb_.expectingSeq(), h); //just punch the card
280  recvBackupMessageCount_++;
281  } else {
282  break;
283  }
284  }
285  memmove(buf_, bufCur_, filledLen_);
286  bufCur_ = buf_;
287 
288  if (readFd_.isFdReady()) {
289  auto l = recv(readFd_.fd, buf_ + filledLen_, bufSize_ - filledLen_
290  , MSG_NOSIGNAL|MSG_DONTWAIT);
291  if (hmbdc_unlikely(l < 0)) {
292  if (hmbdc_unlikely(!readFd_.checkErr())) {
293  HMBDC_LOG_C("recv failed errno=", errno);
294  return false;
295  }
296  return true;
297  } else if (hmbdc_unlikely(l == 0)) {
298  HMBDC_LOG_W("peer dropped:", id());
299  return false;
300  }
301  filledLen_ += l;
302  }
303 
304  return true;
305  }
306 
307  Config const& config_;
308  HMBDC_SEQ_TYPE minSeq_;
309  TypeTagSet const& subscriptions_;
310  utils::EpollFd readFd_;
311  TcpEpollFd writeFd_;
312  OutputBuffer outputBuffer_;
313  string id_;
314  MessageWrap<SessionStarted> sessionStarted_;
315  MessageWrap<SessionDropped> sessionDropped_;
316  size_t const bufSize_;
317  char* buf_;
318  char* bufCur_;
319  size_t filledLen_;
321  bool initialized_;
322  bool ready_;
323  bool stopped_;
324  size_t recvBackupMessageCount_;
325  bool gapPending_; //a gap is reported and not filled yet
326  HMBDC_SEQ_TYPE gapPendingSeq_;
327  uint16_t attTypeTag_;
328  void* attUserScratchpad_;
329 };
330 } //backuprecvsessiont_detail
331 
332 template <typename TransportMessageHeader
333  , typename SessionStarted
334  , typename SessionDropped
335  , typename SeqAlert
336  , typename OutputBuffer
337  , typename SendFrom
338  , typename AttachmentAllocator>
339 using BackupRecvSessionT =
341  TransportMessageHeader
342  , SessionStarted
343  , SessionDropped
344  , SeqAlert
345  , OutputBuffer
346  , SendFrom
347  , AttachmentAllocator
348  >;
349 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: TcpEpollFd.hpp:14
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
Definition: EpollTask.hpp:87
Definition: Time.hpp:14
Definition: TypeTagSet.hpp:141
Definition: Time.hpp:134
Definition: Rater.hpp:10
Definition: Base.hpp:12
Definition: Timers.hpp:117