1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/tips/reliable/TcpEpollFd.hpp" 4 #include "hmbdc/tips/TypeTagSet.hpp" 5 #include "hmbdc/app/Logger.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/time/Rater.hpp" 8 #include "hmbdc/pattern/LockFreeBufferT.hpp" 9 #include "hmbdc/comm/inet/Misc.hpp" 11 #include <boost/circular_buffer.hpp> 12 #include <unordered_set> 16 #include <netinet/tcp.h> 18 namespace hmbdc {
namespace tips {
namespace reliable {
20 using ToCleanupAttQueue = boost::circular_buffer<std::tuple<
22 , app::hasMemoryAttachment*
23 , app::hasMemoryAttachment::AfterConsumedCleanupFunc
27 namespace backupsendservert_detail {
31 template <
typename TypeTagBackupSource,
typename BackupSendSession>
36 , serverAddr_(serverFd_.localAddr) {
38 if (listen(serverFd_.fd, 10) < 0) {
39 HMBDC_THROW(runtime_error,
"failed to listen, errno=" << errno);
41 utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, serverFd_);
45 decltype(advertisingMessages_) newAds;
46 tts.exportTo([&](uint16_t tag,
auto&& subCount) {
47 if (newAds.size() == 0
48 || !newAds.rbegin()->addTypeTag(tag)) {
49 newAds.emplace_back(serverFd_.localIp
51 , time::Duration::microseconds(config_.getExt<uint32_t>(
"recvReportDelayMicrosec"))
52 , config_.getExt<
bool>(
"loopback"));
53 newAds.rbegin()->addTypeTag(tag);
56 std::swap(advertisingMessages_, newAds);
57 for (
auto& m : advertisingMessages_) {
58 HMBDC_LOG_N(
"adverise at ", m);
62 auto const& advertisingMessages()
const {
63 return advertisingMessages_;
69 sockaddr_in& serverAddr_;
70 std::vector<TypeTagBackupSource> advertisingMessages_;
74 template <
typename TypeTagBackupSource,
typename BackupSendSession>
77 using Buffer =
typename BackupSendSession::Buffer;
81 , ToCleanupAttQueue& toCleanupAttQueue
86 , maxSendBatch_(cfg.
getExt<uint32_t>(
"maxSendBatch"))
87 , toCleanupAttQueue_(toCleanupAttQueue)
88 , replayHistoryForNewRecv_(cfg.
getExt<
size_t>(
"replayHistoryForNewRecv"))
89 , outboundSubscriptions_(outboundSubscriptions) {
90 leastIt_.seq_ = numeric_limits<HMBDC_SEQ_TYPE>::max();
95 void runOnce() HMBDC_RESTRICT {
99 auto newSeq_= buffer_.readSeq(0);
100 for (
auto it = sessions_.begin(); it != sessions_.end();) {
101 if (hmbdc_unlikely(!(*it)->runOnce())) {
103 sessions_.erase(it++);
105 if (newSeq_ > (*it)->bufItNext_.seq_) {
106 newSeq_ = (*it)->bufItNext_.seq_;
111 if (hmbdc_likely(replayHistoryForNewRecv_ < newSeq_)) {
112 newSeq_ -= replayHistoryForNewRecv_;
116 if (leastIt_.seq_ != newSeq_) {
117 while (toCleanupAttQueue_.size()) {
118 auto it = toCleanupAttQueue_.begin();
119 if (newSeq_ > get<0>(*it)) {
120 auto f = get<2>(*it);
121 if (f) f(get<1>(*it));
122 toCleanupAttQueue_.pop_front();
127 buffer_.catchUpTo(1, newSeq_);
129 leastIt_.seq_ = newSeq_;
133 while (toCleanupAttQueue_.size()) {
134 auto it = toCleanupAttQueue_.begin();
135 auto f = get<2>(*it);
136 if (f) f(get<1>(*it));
137 toCleanupAttQueue_.pop_front();
141 size_t sessionCount()
const {
142 return sessions_.size();
145 size_t readySessionCount() HMBDC_RESTRICT
const {
147 for (
auto const& s : sessions_) {
148 if (s->ready()) res++;
153 void killSlowestSession() {
154 if (sessions_.size() == 0) {
158 auto it = sessions_.begin();
159 auto leastSessIt = it;
161 for (; it != sessions_.end(); it++) {
162 if ((*it)->bufItNext_ < leastIt) {
163 leastIt = (*it)->bufItNext_;
167 HMBDC_LOG_C((*leastSessIt)->id(),
" too slow, dropping");
168 (*leastSessIt)->stop();
169 sessions_.erase(leastSessIt);
173 void doAccept() HMBDC_RESTRICT {
174 auto& serverFd_ = this->serverFd_;
175 auto& serverAddr_ = this->serverAddr_;
176 if (hmbdc_unlikely(serverFd_.isFdReady())) {
177 auto addrlen =
sizeof(serverAddr_);
178 auto conn = accept(serverFd_.fd, (
struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
180 if (!serverFd_.checkErr()) {
181 HMBDC_LOG_C(
"accept failure, errno=", errno);
185 auto sz = this->config_.template getExt<int>(
"tcpSendBufferBytes");
187 if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz,
sizeof(sz)) < 0) {
188 HMBDC_LOG_C(
"failed to set send buffer size=", sz,
" errno=", errno);
191 int flag = this->config_.template getExt<bool>(
"nagling")?0:1;
192 if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (
char*) &flag,
sizeof(flag)) < 0) {
193 HMBDC_LOG_C(
"failed to set TCP_NODELAY, errno=", errno);
197 buffer_.peek(1, it, it, 0);
198 auto s = std::make_shared<BackupSendSession>(conn
203 , outboundSubscriptions_);
206 }
catch (std::exception
const& e) {
207 HMBDC_LOG_C(e.what());
212 using Sessions = unordered_set<typename BackupSendSession::ptr>;
214 Buffer& HMBDC_RESTRICT buffer_;
217 size_t maxSendBatch_;
218 ToCleanupAttQueue& toCleanupAttQueue_;
219 size_t replayHistoryForNewRecv_;
225 template <
typename TypeTagBackupSource,
typename BackupSendSession>
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: TcpEpollFd.hpp:14
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: BackupSendServerT.hpp:32
Definition: TypedString.hpp:84
Definition: BlockingBuffer.hpp:11
Definition: BackupSendServerT.hpp:75
Definition: LockFreeBufferMisc.hpp:89