hmbdc
simplify-high-performance-messaging-programming
BackupSendServerT.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/reliable/TcpEpollFd.hpp"
4 #include "hmbdc/tips/TypeTagSet.hpp"
5 #include "hmbdc/app/Logger.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/time/Rater.hpp"
8 #include "hmbdc/pattern/LockFreeBufferT.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
10 
11 #include <boost/circular_buffer.hpp>
12 #include <unordered_set>
13 #include <memory>
14 #include <tuple>
15 #include <utility>
16 #include <netinet/tcp.h>
17 
18 namespace hmbdc { namespace tips { namespace reliable {
19 
20 using ToCleanupAttQueue = boost::circular_buffer<std::tuple<
21  HMBDC_SEQ_TYPE
22  , app::hasMemoryAttachment*
23  , app::hasMemoryAttachment::AfterConsumedCleanupFunc
24  >
25 >;
26 
27 namespace backupsendservert_detail {
28 using namespace hmbdc::pattern;
29 using namespace std;
30 
31 template <typename TypeTagBackupSource, typename BackupSendSession>
33  BackupSendServerT(app::Config const& cfg)
34  : config_(cfg)
35  , serverFd_(config_)
36  , serverAddr_(serverFd_.localAddr) {
37  using namespace app;
38  if (listen(serverFd_.fd, 10) < 0) {
39  HMBDC_THROW(runtime_error, "failed to listen, errno=" << errno);
40  }
41  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, serverFd_);
42  }
43 
44  void advertisingMessages(TypeTagSetST& tts) {
45  decltype(advertisingMessages_) newAds;
46  tts.exportTo([&](uint16_t tag, auto&& subCount) {
47  if (newAds.size() == 0
48  || !newAds.rbegin()->addTypeTag(tag)) {
49  newAds.emplace_back(serverFd_.localIp
50  , serverFd_.localPort
51  , time::Duration::microseconds(config_.getExt<uint32_t>("recvReportDelayMicrosec"))
52  , config_.getExt<bool>("loopback"));
53  newAds.rbegin()->addTypeTag(tag);
54  }
55  });
56  std::swap(advertisingMessages_, newAds);
57  for (auto& m : advertisingMessages_) {
58  HMBDC_LOG_N("adverise at ", m);
59  }
60  }
61 
62  auto const& advertisingMessages() const {
63  return advertisingMessages_;
64  }
65 
66 protected:
67  app::Config config_;
68  TcpEpollFd serverFd_;
69  sockaddr_in& serverAddr_;
70  std::vector<TypeTagBackupSource> advertisingMessages_;
71 };
72 
73 
74 template <typename TypeTagBackupSource, typename BackupSendSession>
76 : BackupSendServerT<TypeTagBackupSource, BackupSendSession> {
77  using Buffer = typename BackupSendSession::Buffer;
79  , Buffer& buffer
80  , time::Rater& rater
81  , ToCleanupAttQueue& toCleanupAttQueue
82  , TypeTagSet& outboundSubscriptions
84  , buffer_(buffer)
85  , rater_(rater)
86  , maxSendBatch_(cfg.getExt<uint32_t>("maxSendBatch"))
87  , toCleanupAttQueue_(toCleanupAttQueue)
88  , replayHistoryForNewRecv_(cfg.getExt<size_t>("replayHistoryForNewRecv"))
89  , outboundSubscriptions_(outboundSubscriptions) {
90  leastIt_.seq_ = numeric_limits<HMBDC_SEQ_TYPE>::max();
91  }
92 
93  virtual ~AsyncBackupSendServerT() = default;
94 
95  void runOnce() HMBDC_RESTRICT {
96  // utils::EpollTask::instance().poll();
97  doAccept();
98  typename Buffer::iterator leastIt;
99  auto newSeq_= buffer_.readSeq(0);
100  for (auto it = sessions_.begin(); it != sessions_.end();) {
101  if (hmbdc_unlikely(!(*it)->runOnce())) {
102  (*it)->stop();
103  sessions_.erase(it++);
104  } else {
105  if (newSeq_ > (*it)->bufItNext_.seq_) {
106  newSeq_ = (*it)->bufItNext_.seq_;
107  }
108  it++;
109  }
110  }
111  if (hmbdc_likely(replayHistoryForNewRecv_ < newSeq_)) {
112  newSeq_ -= replayHistoryForNewRecv_;
113  } else {
114  newSeq_ = 0;
115  }
116  if (leastIt_.seq_ != newSeq_) {
117  while (toCleanupAttQueue_.size()) {
118  auto it = toCleanupAttQueue_.begin();
119  if (newSeq_ > get<0>(*it)) {
120  auto f = get<2>(*it);
121  if (f) f(get<1>(*it));
122  toCleanupAttQueue_.pop_front();
123  } else {
124  break;
125  }
126  }
127  buffer_.catchUpTo(1, newSeq_);
128  }
129  leastIt_.seq_ = newSeq_;
130  }
131 
132  void stop() {
133  while (toCleanupAttQueue_.size()) {
134  auto it = toCleanupAttQueue_.begin();
135  auto f = get<2>(*it);
136  if (f) f(get<1>(*it));
137  toCleanupAttQueue_.pop_front();
138  }
139  }
140 
141  size_t sessionCount() const {
142  return sessions_.size();
143  }
144 
145  size_t readySessionCount() HMBDC_RESTRICT const {
146  size_t res = 0;
147  for (auto const& s : sessions_) {
148  if (s->ready()) res++;
149  }
150  return res;
151  }
152 
153  void killSlowestSession() {
154  if (sessions_.size() == 0) {
155  return;
156  }
157 
158  auto it = sessions_.begin();
159  auto leastSessIt = it;
160  typename Buffer::iterator leastIt = (*it++)->bufItNext_;
161  for (; it != sessions_.end(); it++) {
162  if ((*it)->bufItNext_ < leastIt) {
163  leastIt = (*it)->bufItNext_;
164  leastSessIt = it;
165  }
166  }
167  HMBDC_LOG_C((*leastSessIt)->id(), " too slow, dropping");
168  (*leastSessIt)->stop();
169  sessions_.erase(leastSessIt);
170  }
171 
172 private:
173  void doAccept() HMBDC_RESTRICT {
174  auto& serverFd_ = this->serverFd_;
175  auto& serverAddr_ = this->serverAddr_;
176  if (hmbdc_unlikely(serverFd_.isFdReady())) {
177  auto addrlen = sizeof(serverAddr_);
178  auto conn = accept(serverFd_.fd, (struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
179  if (conn == -1) {
180  if (!serverFd_.checkErr()) {
181  HMBDC_LOG_C("accept failure, errno=", errno);
182  }
183  return;
184  }
185  auto sz = this->config_.template getExt<int>("tcpSendBufferBytes");
186  if (sz) {
187  if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)) < 0) {
188  HMBDC_LOG_C("failed to set send buffer size=", sz, " errno=", errno);
189  }
190  }
191  int flag = this->config_.template getExt<bool>("nagling")?0:1;
192  if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)) < 0) {
193  HMBDC_LOG_C("failed to set TCP_NODELAY, errno=", errno);
194  }
195  try {
196  typename Buffer::iterator it;
197  buffer_.peek(1, it, it, 0);
198  auto s = std::make_shared<BackupSendSession>(conn
199  , buffer_
200  , rater_
201  , it
202  , maxSendBatch_
203  , outboundSubscriptions_);
204  s->start();
205  sessions_.insert(s);
206  } catch (std::exception const& e) {
207  HMBDC_LOG_C(e.what());
208  }
209  };
210  }
211 
212  using Sessions = unordered_set<typename BackupSendSession::ptr>;
213  Sessions sessions_;
214  Buffer& HMBDC_RESTRICT buffer_;
215  time::Rater& HMBDC_RESTRICT rater_;
216  typename Buffer::iterator leastIt_;
217  size_t maxSendBatch_;
218  ToCleanupAttQueue& toCleanupAttQueue_;
219  size_t replayHistoryForNewRecv_;
220  TypeTagSet& outboundSubscriptions_;
221 };
222 
223 } //backupsendservert_detail
224 
225 template <typename TypeTagBackupSource, typename BackupSendSession>
227 
228 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: TcpEpollFd.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: TypedString.hpp:84
Definition: BlockingBuffer.hpp:11
Definition: TypeTagSet.hpp:141
Definition: TypeTagSet.hpp:201
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89