1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/utils/EpollTask.hpp" 4 #include "hmbdc/app/Base.hpp" 6 #include "hmbdc/tips/TypeTagSet.hpp" 7 #include "hmbdc/time/Rater.hpp" 8 #include "hmbdc/pattern/LockFreeBufferT.hpp" 9 #include "hmbdc/comm/inet/Misc.hpp" 11 #include <boost/lexical_cast.hpp> 12 #include <boost/circular_buffer.hpp> 19 #include <sys/socket.h> 21 namespace hmbdc {
namespace tips {
namespace reliable {
24 namespace backupsendservert_detail {
25 template <
typename TypeTagBackupSource,
typename Me>
26 struct AsyncBackupSendServerT;
29 namespace backupsendsessiont_detail {
34 using ToSend = std::vector<iovec>;
35 using ToSendQueue = boost::circular_buffer<tuple<
41 template <
typename TransportMessageHeader>
43 using ptr = shared_ptr<BackupSendSessionT>;
49 ,
size_t maxSendBatchHint
53 , outboundSubscriptions_(outboundSubscriptions)
62 , sendBackupMessageCount_(0)
64 maxSendBatchHint = std::min(maxSendBatchHint * 2,
size_t(UIO_MAXIOV));
65 msghdrRelic_.msg_iov =
new iovec[maxSendBatchHint];
66 auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
67 id_ = addrPort.first +
":" + std::to_string(addrPort.second);
68 auto forRead = dup(fd);
70 HMBDC_THROW(std::runtime_error,
"dup failed errno=" << errno);
74 utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
76 utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
78 toSend_.reserve(maxSendBatchHint);
79 toSendQueue_.set_capacity(buffer.capacity());
82 auto h =
reinterpret_cast<TransportMessageHeader*
>(addr);
83 h->messagePayloadLen = 0;
87 clientSubscriptions_.exportTo([
this](uint16_t tag,
auto&&) {
88 outboundSubscriptions_.erase(tag);
91 delete [] msghdrRelic_.msg_iov;
92 HMBDC_LOG_N(
"BackupSendSessionT retired: ",
id(),
" lifetime sent:", sendBackupMessageCount_);
96 HMBDC_LOG_N(
"BackupSendSessionT started: ",
id(),
" minSeq=", bufIt_.seq_);
97 auto l = send(writeFd_.fd, &bufIt_.seq_,
sizeof(bufIt_.seq_), MSG_NOSIGNAL);
98 if (l !=
sizeof(bufIt_.seq_)) {
99 HMBDC_THROW(std::runtime_error,
"cannot initialize the new connection, errno=" << errno);
107 char const* id()
const {
116 bool runOnce() HMBDC_RESTRICT {
117 if (hmbdc_unlikely(!doRead()))
return false;
118 if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
119 auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
120 if (hmbdc_unlikely(l < 0)) {
121 if (!writeFd_.checkErr()) {
122 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
127 msghdrRelicSize_ -= size_t(l);
128 if (hmbdc_unlikely(msghdrRelicSize_)) {
130 comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
136 while(!msghdrRelicSize_ && toSendQueue_.size()) {
137 auto it = toSendQueue_.begin();
138 auto seq = get<0>(*it);
139 auto len = get<1>(*it);
140 if (hmbdc_unlikely(seq < bufItNext_.seq_
141 || len > buffer_.capacity()
142 || seq + len > buffer_.readSeq(0))) {
144 HMBDC_LOG_C(
id(),
", received bad data ", seq,
',', len,
" stopping connection");
148 if (hmbdc_unlikely(len)) {
150 size_t bytesToSend = 0;
152 for (; len && toSend_.size() < toSend_.capacity() - 1; len--) {
153 void* ptr = *bufIt_++;
154 auto item =
static_cast<TransportMessageHeader*
>(ptr);
156 bool ifResend = clientSubscriptions_.check(item->typeTag());
157 ifResend = ifResend && (item->typeTag() != MemorySeg::typeTag
158 || clientSubscriptions_.check(item->template wrapped<MemorySeg>().inbandUnderlyingTypeTag));
159 ifResend = ifResend && (item->typeTag() != StartMemorySegTrain::typeTag
160 || clientSubscriptions_.check(item->template wrapped<StartMemorySegTrain>().inbandUnderlyingTypeTag));
164 if (hmbdc_unlikely(item->typeTag() == MemorySeg::typeTag)) {
165 toSend_.push_back(iovec{(
void*)item->wireBytes(), item->wireSize() - item->wireSizeMemorySeg()});
166 toSend_.push_back(iovec{(
void*)item->wireBytesMemorySeg(), item->wireSizeMemorySeg()});
168 toSend_.push_back(iovec{(
void*)item->wireBytes(), item->wireSize()});
170 bytesToSend += item->wireSize();
172 toSend_.push_back(iovec{(
void*)flush_,
sizeof(flush_)});
173 bytesToSend +=
sizeof(flush_);
176 msghdr_.msg_iov = &(toSend_[0]);
177 msghdr_.msg_iovlen = toSend_.size();
178 sendBackupMessageCount_ += msghdr_.msg_iovlen;
179 auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
180 if (hmbdc_unlikely(l < 0)) {
181 if (!writeFd_.checkErr()) {
182 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
187 msghdrRelicSize_ = bytesToSend - size_t(l);
188 if (hmbdc_likely(!len)) {
189 toSendQueue_.pop_front();
191 get<0>(*it) += get<1>(*it) - len;
194 rater_.check(bytesToSend << 8u);
196 if (hmbdc_unlikely(msghdrRelicSize_)) {
197 comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
202 bufItNext_.seq_ = get<0>(*it);
203 toSendQueue_.pop_front();
211 template <
typename TypeTagBackupSource,
typename Me>
214 bool doRead() HMBDC_RESTRICT {
215 if (readFd_.isFdReady()) {
216 auto l = recv(readFd_.fd, data_ + filledLen_,
sizeof(data_) - filledLen_
217 , MSG_NOSIGNAL|MSG_DONTWAIT);
218 if (hmbdc_unlikely(l < 0)) {
219 if (!readFd_.checkErr()) {
220 HMBDC_LOG_C(
"recv failed errno=", errno);
223 }
else if (hmbdc_unlikely(l == 0)) {
224 HMBDC_LOG_W(
"peer dropped:",
id());
231 auto p = find(data_, data_ + filledLen_,
'\t');
232 if (p != data_ + filledLen_) {
238 else if (data_[0] ==
'+') {
239 auto tag = (uint16_t)std::stoi(t);
240 if (clientSubscriptions_.set(tag)) {
241 outboundSubscriptions_.add(tag);
243 }
else if (data_[0] ==
'-') {
244 auto tag = (uint16_t)std::stoi(t);
245 if (clientSubscriptions_.unset(tag)) {
246 outboundSubscriptions_.erase(tag);
248 }
else if (data_[0] ==
'=') {
251 if (hmbdc_likely(2 == sscanf(data_ + 1,
"%" SCNu64
",%zu", &seq, &len))) {
252 toSendQueue_.push_back(make_tuple(seq, len));
254 HMBDC_LOG_C(
id(),
", received bad data ", data_ + 1,
" stopping connection");
258 memmove(data_, p + 1, filledLen_ - (p - data_ + 1));
259 filledLen_ -= p - data_ + 1;
268 Buffer const& HMBDC_RESTRICT buffer_;
269 Rater& HMBDC_RESTRICT rater_;
274 char data_[16 * 1024];
280 ToSendQueue toSendQueue_;
283 size_t msghdrRelicSize_;
286 char flush_[
sizeof(TransportMessageHeader)];
287 size_t sendBackupMessageCount_;
292 template <
typename TransportMessageHeader>
Definition: BackupSendSessionT.hpp:42
Definition: TypedString.hpp:84
Definition: EpollTask.hpp:87
Definition: BackupSendServerT.hpp:75
Definition: LockFreeBufferMisc.hpp:89