hmbdc
simplify-high-performance-messaging-programming
BackupSendSessionT.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/utils/EpollTask.hpp"
4 #include "hmbdc/app/Base.hpp"
5 
6 #include "hmbdc/tips/TypeTagSet.hpp"
7 #include "hmbdc/time/Rater.hpp"
8 #include "hmbdc/pattern/LockFreeBufferT.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
10 
11 #include <boost/lexical_cast.hpp>
12 #include <boost/circular_buffer.hpp>
13 
14 #include <memory>
15 #include <utility>
16 
17 #include <cinttypes>
18 #include <sys/uio.h>
19 #include <sys/socket.h>
20 
21 namespace hmbdc { namespace tips { namespace reliable {
22 
24 namespace backupsendservert_detail {
25 template <typename TypeTagBackupSource, typename Me>
26 struct AsyncBackupSendServerT;
27 }
28 
29 namespace backupsendsessiont_detail {
30 using namespace std;
31 using namespace hmbdc::app;
32 using namespace hmbdc::time;
33 
34 using ToSend = std::vector<iovec>;
35 using ToSendQueue = boost::circular_buffer<tuple<
36  HMBDC_SEQ_TYPE
37  , size_t
38  >
39 >;
40 
41 template <typename TransportMessageHeader>
43  using ptr = shared_ptr<BackupSendSessionT>;
44  using Buffer = reliable::Buffer;
45  BackupSendSessionT(int fd
46  , Buffer const& buffer
47  , Rater& rater
48  , Buffer::iterator bufIt
49  , size_t maxSendBatchHint
50  , TypeTagSet& outboundSubscriptions)
51  : buffer_(buffer)
52  , rater_(rater)
53  , outboundSubscriptions_(outboundSubscriptions)
54  , filledLen_(0)
55  , initialized_(false)
56  , ready_(false)
57  , msghdr_{0}
58  , msghdrRelic_{0}
59  , msghdrRelicSize_(0)
60  , bufIt_(bufIt)
61  , bufItNext_(bufIt)
62  , sendBackupMessageCount_(0)
63  {
64  maxSendBatchHint = std::min(maxSendBatchHint * 2, size_t(UIO_MAXIOV)); //double for attachment
65  msghdrRelic_.msg_iov = new iovec[maxSendBatchHint];
66  auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
67  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
68  auto forRead = dup(fd);
69  if (forRead == -1) {
70  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
71  }
72 
73  readFd_.fd = forRead;
74  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
75  writeFd_.fd = fd;
76  utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
77 
78  toSend_.reserve(maxSendBatchHint);
79  toSendQueue_.set_capacity(buffer.capacity());
80 
81  char* addr = flush_;
82  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
83  h->messagePayloadLen = 0;
84  }
85 
86  virtual ~BackupSendSessionT() {
87  clientSubscriptions_.exportTo([this](uint16_t tag, auto&&) {
88  outboundSubscriptions_.erase(tag);
89  });
90 
91  delete [] msghdrRelic_.msg_iov;
92  HMBDC_LOG_N("BackupSendSessionT retired: ", id(), " lifetime sent:", sendBackupMessageCount_);
93  }
94 
95  void start() {
96  HMBDC_LOG_N("BackupSendSessionT started: ", id(), " minSeq=", bufIt_.seq_);
97  auto l = send(writeFd_.fd, &bufIt_.seq_, sizeof(bufIt_.seq_), MSG_NOSIGNAL);
98  if (l != sizeof(bufIt_.seq_)) {
99  HMBDC_THROW(std::runtime_error, "cannot initialize the new connection, errno=" << errno);
100  }
101  }
102 
103  void stop() {
104  ready_ = false;
105  }
106 
107  char const* id() const {
108  return id_.c_str();
109  }
110 
111  bool ready() const {
112  return ready_;
113  }
114 
115 
116  bool runOnce() HMBDC_RESTRICT {
117  if (hmbdc_unlikely(!doRead())) return false;
118  if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
119  auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
120  if (hmbdc_unlikely(l < 0)) {
121  if (!writeFd_.checkErr()) {
122  HMBDC_LOG_C("sendmsg failed errno=", errno);
123  return false;
124  }
125  return true;
126  }
127  msghdrRelicSize_ -= size_t(l);
128  if (hmbdc_unlikely(msghdrRelicSize_)) {
129  // HMBDC_LOG_C(msghdrRelicSize_);
130  comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
131  return true;
132  } else {
133  bufItNext_ = bufIt_;
134  }
135  }
136  while(!msghdrRelicSize_ && toSendQueue_.size()) {
137  auto it = toSendQueue_.begin();
138  auto seq = get<0>(*it);
139  auto len = get<1>(*it);
140  if (hmbdc_unlikely(seq < bufItNext_.seq_
141  || len > buffer_.capacity()
142  || seq + len > buffer_.readSeq(0))) {
143  //the connection is bad, invalid data, bail out
144  HMBDC_LOG_C(id(), ", received bad data ", seq, ',', len, " stopping connection");
145  return false;
146  }
147 
148  if (hmbdc_unlikely(len)) {
149  bufIt_.seq_ = seq;
150  size_t bytesToSend = 0;
151  toSend_.clear();
152  for (; len && toSend_.size() < toSend_.capacity() - 1; len--) {
153  void* ptr = *bufIt_++;
154  auto item = static_cast<TransportMessageHeader*>(ptr);
155 
156  bool ifResend = clientSubscriptions_.check(item->typeTag());
157  ifResend = ifResend && (item->typeTag() != MemorySeg::typeTag
158  || clientSubscriptions_.check(item->template wrapped<MemorySeg>().inbandUnderlyingTypeTag));
159  ifResend = ifResend && (item->typeTag() != StartMemorySegTrain::typeTag
160  || clientSubscriptions_.check(item->template wrapped<StartMemorySegTrain>().inbandUnderlyingTypeTag));
161 
162  // HMBDC_LOG_N(item->wrapped<SeqMessage>().seq);
163  if (ifResend) {
164  if (hmbdc_unlikely(item->typeTag() == MemorySeg::typeTag)) {
165  toSend_.push_back(iovec{(void*)item->wireBytes(), item->wireSize() - item->wireSizeMemorySeg()});
166  toSend_.push_back(iovec{(void*)item->wireBytesMemorySeg(), item->wireSizeMemorySeg()});
167  } else {
168  toSend_.push_back(iovec{(void*)item->wireBytes(), item->wireSize()});
169  }
170  bytesToSend += item->wireSize();
171  } else { //dont send a message the recv is going to drop, send a flush
172  toSend_.push_back(iovec{(void*)flush_, sizeof(flush_)});
173  bytesToSend += sizeof(flush_);
174  }
175  }
176  msghdr_.msg_iov = &(toSend_[0]);
177  msghdr_.msg_iovlen = toSend_.size();
178  sendBackupMessageCount_ += msghdr_.msg_iovlen;
179  auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
180  if (hmbdc_unlikely(l < 0)) {
181  if (!writeFd_.checkErr()) {
182  HMBDC_LOG_C("sendmsg failed errno=", errno);
183  return false;
184  }
185  return true;
186  }
187  msghdrRelicSize_ = bytesToSend - size_t(l);
188  if (hmbdc_likely(!len)) {
189  toSendQueue_.pop_front();
190  } else {
191  get<0>(*it) += get<1>(*it) - len;
192  get<1>(*it) = len;
193  }
194  rater_.check(bytesToSend << 8u);
195  rater_.commit();
196  if (hmbdc_unlikely(msghdrRelicSize_)) {
197  comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
198  return true;
199  }
200  bufItNext_ = bufIt_;
201  } else {
202  bufItNext_.seq_ = get<0>(*it);
203  toSendQueue_.pop_front();
204  }
205  }
206 
207  return true;
208  }
209 
210 private:
211  template <typename TypeTagBackupSource, typename Me>
213 
214  bool doRead() HMBDC_RESTRICT {
215  if (readFd_.isFdReady()) {
216  auto l = recv(readFd_.fd, data_ + filledLen_, sizeof(data_) - filledLen_
217  , MSG_NOSIGNAL|MSG_DONTWAIT);
218  if (hmbdc_unlikely(l < 0)) {
219  if (!readFd_.checkErr()) {
220  HMBDC_LOG_C("recv failed errno=", errno);
221  return false;
222  }
223  } else if (hmbdc_unlikely(l == 0)) {
224  HMBDC_LOG_W("peer dropped:", id());
225  return false;
226  } else {
227  filledLen_ += l;
228  }
229  }
230  while (filledLen_) {
231  auto p = find(data_, data_ + filledLen_, '\t');
232  if (p != data_ + filledLen_) {
233  *p = '\000';
234  string t(data_ + 1);
235  if (t.size() == 0) {
236  ready_ = true;
237  }
238  else if (data_[0] == '+') {
239  auto tag = (uint16_t)std::stoi(t);
240  if (clientSubscriptions_.set(tag)) {
241  outboundSubscriptions_.add(tag);
242  }
243  } else if (data_[0] == '-') {
244  auto tag = (uint16_t)std::stoi(t);
245  if (clientSubscriptions_.unset(tag)) {
246  outboundSubscriptions_.erase(tag);
247  }
248  } else if (data_[0] == '=') {
249  uint64_t seq;
250  size_t len;
251  if (hmbdc_likely(2 == sscanf(data_ + 1, "%" SCNu64 ",%zu", &seq, &len))) {
252  toSendQueue_.push_back(make_tuple(seq, len));
253  } else {
254  HMBDC_LOG_C(id(), ", received bad data ", data_ + 1, " stopping connection");
255  return false;
256  }
257  } //else ignore
258  memmove(data_, p + 1, filledLen_ - (p - data_ + 1));
259  filledLen_ -= p - data_ + 1;
260  } else {
261  break; //no complete request
262  }
263  }
264  return true;
265  }
266 
267 private:
268  Buffer const& HMBDC_RESTRICT buffer_;
269  Rater& HMBDC_RESTRICT rater_;
270  TypeTagSet clientSubscriptions_;
271  TypeTagSet& outboundSubscriptions_;
272  utils::EpollFd writeFd_;
273  utils::EpollFd readFd_;
274  char data_[16 * 1024];
275  size_t filledLen_;
276  string id_;
277  bool initialized_;
278  bool ready_;
279  ToSend toSend_;
280  ToSendQueue toSendQueue_;
281  msghdr msghdr_;
282  msghdr msghdrRelic_;
283  size_t msghdrRelicSize_;
284  Buffer::iterator bufIt_;
285  Buffer::iterator bufItNext_;
286  char flush_[sizeof(TransportMessageHeader)];
287  size_t sendBackupMessageCount_;
288 };
289 
290 } //backupsendsessiont_detail
291 
292 template <typename TransportMessageHeader>
294 }}}
Definition: Base.hpp:12
Definition: TypedString.hpp:84
Definition: EpollTask.hpp:87
Definition: TypeTagSet.hpp:141
Definition: Rater.hpp:10
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89