1 #include "hmbdc/Copyright.hpp"
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
7 #include "hmbdc/text/StringTrieSet.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
10 #include "hmbdc/comm/inet/Misc.hpp"
12 #include <boost/lexical_cast.hpp>
13 #include <boost/circular_buffer.hpp>
21 namespace hmbdc {
namespace app {
namespace tcpcast {
23 namespace sendserver_detail {
27 using ToSend = std::vector<iovec>;
28 using ToSendQueue = boost::circular_buffer<std::tuple<std::pair<char const*, char const*>
35 namespace sendsession_detail {
39 using ptr = shared_ptr<SendSession>;
42 , ToSendQueue & toSendQueue
43 ,
size_t maxSendBatch)
47 , toSendQueue_(toSendQueue)
48 , toSendQueueIndex_(0)
51 , msghdrRelicSize_(0) {
52 msghdrRelic_.msg_iov =
new iovec[maxSendBatch * 2];
53 auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
54 id_ = addrPort.first +
":" + std::to_string(addrPort.second);
55 auto forRead = dup(fd);
57 HMBDC_THROW(std::runtime_error,
"dup failed errno=" << errno);
61 utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
63 utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
64 HMBDC_LOG_N(
"SendSession started: ",
id());
68 delete [] msghdrRelic_.msg_iov;
69 HMBDC_LOG_N(
"SendSession retired: ",
id());
72 bool runOnce() HMBDC_RESTRICT {
73 if (hmbdc_unlikely(!doRead()))
return false;
74 if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
75 auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
76 if (hmbdc_unlikely(l < 0)) {
77 if (!writeFd_.checkErr()) {
78 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
83 msghdrRelicSize_ -= size_t(l);
84 if (hmbdc_unlikely(msghdrRelicSize_)) {
85 comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
91 for (; !msghdrRelicSize_ && writeFd_.isFdReady()
92 && toSendQueueIndex_ != toSendQueue_.size();) {
93 if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
94 msghdr_.msg_iov = &(get<1>(toSendQueue_[toSendQueueIndex_])[0]);
95 msghdr_.msg_iovlen = get<1>(toSendQueue_[toSendQueueIndex_]).size();
96 auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
97 if (hmbdc_unlikely(l < 0)) {
98 if (!writeFd_.checkErr()) {
99 HMBDC_LOG_C(
"sendmsg failed errno=", errno);
104 msghdrRelicSize_ = get<2>(toSendQueue_[toSendQueueIndex_]) -
size_t(l);
105 if (hmbdc_unlikely(msghdrRelicSize_)) {
106 comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
115 char const* id()
const {
125 doRead() HMBDC_RESTRICT {
126 if (hmbdc_unlikely(readFd_.isFdReady())) {
127 auto l = recv(readFd_.fd, data_ + readLen_,
sizeof(data_) - readLen_, MSG_NOSIGNAL|MSG_DONTWAIT);
128 if (hmbdc_unlikely(l < 0)) {
129 if (!readFd_.checkErr()) {
130 HMBDC_LOG_C(
"recv failed errno=", errno);
138 auto p = find(data_, data_ + readLen_,
'\t');
139 if (p != data_ + readLen_) {
142 if (hmbdc_unlikely(connKey_)) {
143 if (data_[0] !=
'@' || stoull(t) != connKey_) {
144 HMBDC_LOG_C(
"invalid connection attempted by ", id_);
148 }
else if (t.size() == 0) {
151 else if (data_[0] ==
'+') {
152 clientSubscriptions_.add(t);
153 }
else if (data_[0] ==
'-') {
154 clientSubscriptions_.erase(t);
156 HMBDC_LOG_C(
"voilating protocle by ", id_);
159 memmove(data_, p + 1, readLen_ - (p - data_ + 1));
160 readLen_ -= p - data_ + 1;
170 char data_[16 * 1024];
176 ToSendQueue& toSendQueue_;
177 ToSendQueue::size_type toSendQueueIndex_;
180 size_t msghdrRelicSize_;
Definition: SendSession.hpp:38
Definition: SendServer.hpp:25
Definition: EpollTask.hpp:70
Definition: StringTrieSetDetail.hpp:115
Definition: LockFreeBufferMisc.hpp:89