1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/tips/tcpcast/Transport.hpp" 5 #include "hmbdc/tips/tcpcast/Messages.hpp" 6 #include "hmbdc/tips/TypeTagSet.hpp" 8 #include "hmbdc/time/Time.hpp" 9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 10 #include "hmbdc/comm/inet/Misc.hpp" 12 #include <boost/lexical_cast.hpp> 13 #include <boost/circular_buffer.hpp> 21 namespace hmbdc {
namespace tips {
namespace tcpcast {
23 namespace sendserver_detail {
27 using ToSend = std::vector<iovec>;
28 using ToSendQueue = boost::circular_buffer<std::tuple<uint16_t
35 namespace sendsession_detail {
39 using ptr = shared_ptr<SendSession>;
41 , ToSendQueue & toSendQueue
46 , toSendQueue_(toSendQueue)
47 , toSendQueueIndex_(0)
51 , outboundSubscriptions_(outboundSubscriptions) {
52 msghdrRelic_.msg_iov =
new iovec[maxSendBatch * 2];
53 auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
54 id_ = addrPort.first +
":" + std::to_string(addrPort.second);
55 auto forRead = dup(fd);
57 HMBDC_THROW(std::runtime_error,
"dup failed errno=" << errno);
62 utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
64 utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
65 HMBDC_LOG_N(
"SendSession started: ",
id());
69 clientSubscriptions_.exportTo([
this](uint16_t tag,
auto&&) {
70 outboundSubscriptions_.erase(tag);
72 delete [] msghdrRelic_.msg_iov;
73 HMBDC_LOG_N(
"SendSession retired: ",
id());
76 bool runOnce() HMBDC_RESTRICT {
77 if (hmbdc_unlikely(!doRead()))
return false;
78 if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
79 auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
80 if (hmbdc_unlikely(l < 0)) {
81 if (!writeFd_.checkErr()) {
82 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
87 msghdrRelicSize_ -= size_t(l);
88 if (hmbdc_unlikely(msghdrRelicSize_)) {
89 comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
95 for (; !msghdrRelicSize_ && writeFd_.isFdReady()
96 && toSendQueueIndex_ != toSendQueue_.size();) {
97 if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
98 msghdr_.msg_iov = &(get<1>(toSendQueue_[toSendQueueIndex_])[0]);
99 msghdr_.msg_iovlen = get<1>(toSendQueue_[toSendQueueIndex_]).size();
100 auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
101 if (hmbdc_unlikely(l < 0)) {
102 if (!writeFd_.checkErr()) {
103 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
108 msghdrRelicSize_ = get<2>(toSendQueue_[toSendQueueIndex_]) -
size_t(l);
109 if (hmbdc_unlikely(msghdrRelicSize_)) {
110 comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
119 char const* id()
const {
129 doRead() HMBDC_RESTRICT {
130 if (hmbdc_unlikely(readFd_.isFdReady())) {
131 auto l = recv(readFd_.fd, data_ + readLen_,
sizeof(data_) - readLen_, MSG_NOSIGNAL|MSG_DONTWAIT);
132 if (hmbdc_unlikely(l < 0)) {
133 if (!readFd_.checkErr()) {
134 HMBDC_LOG_C(
"recv failed errno=", errno);
142 auto p = find(data_, data_ + readLen_,
'\t');
143 if (p != data_ + readLen_) {
148 }
else if (data_[0] ==
'+') {
149 auto tag = (uint16_t)std::stoi(t);
150 if (clientSubscriptions_.set(tag)) {
151 outboundSubscriptions_.add(tag);
153 }
else if (data_[0] ==
'-') {
154 auto tag = (uint16_t)std::stoi(t);
155 if (clientSubscriptions_.unset(tag)) {
156 outboundSubscriptions_.erase(tag);
159 HMBDC_LOG_C(
"voilating protocle by ", id_);
162 memmove(data_, p + 1, readLen_ - (p - data_ + 1));
163 readLen_ -= p - data_ + 1;
172 char data_[16 * 1024];
178 ToSendQueue& toSendQueue_;
179 ToSendQueue::size_type toSendQueueIndex_;
182 size_t msghdrRelicSize_;
Definition: SendServer.hpp:25
Definition: SendSession.hpp:38
Definition: TypedString.hpp:84
Definition: EpollTask.hpp:87
Definition: LockFreeBufferMisc.hpp:89