1 #include "hmbdc/Copyright.hpp"
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/os/DownloadFile.hpp"
8 #include "hmbdc/os/DownloadMemory.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
11 #include "hmbdc/app/tcpcast/RecvSession.hpp"
13 #include <boost/lexical_cast.hpp>
22 namespace hmbdc {
namespace app {
namespace tcpcast {
24 namespace recvsession_detail {
26 using namespace hmbdc::text;
28 template <
typename OutputBuffer,
typename MsgArbitrator>
30 using ptr = shared_ptr<RecvSession<OutputBuffer, MsgArbitrator>>;
31 using CleanupFunc = std::function<void()>;
34 , std::regex
const& topicRegex
35 , OutputBuffer& outputBuffer
39 , subscriptions_(subscriptions)
40 , topicRegex_(topicRegex)
43 , outputBuffer_(outputBuffer)
47 , bufSize_(config.
getExt<
size_t>(
"maxTcpReadBytes"))
48 , buf_((
char*)memalign(SMP_CACHE_BYTES, bufSize_))
51 , currTransportHeadFlag_(0) {
52 config(allowRecvMemoryAttachment_,
"allowRecvMemoryAttachment");
57 HMBDC_LOG_N(
"RecvSession retired: ",
id());
60 void start(in_addr_t ip, uint16_t port) {
61 sockaddr_in remoteAddr = {0};
62 remoteAddr.sin_family = AF_INET;
63 remoteAddr.sin_addr.s_addr = ip;
64 remoteAddr.sin_port = htons(port);
65 if (connect(writeFd_.fd, (sockaddr*)&remoteAddr,
sizeof(remoteAddr)) < 0) {
66 if (errno != EINPROGRESS) {
67 HMBDC_THROW(runtime_error,
"connect fail, errno=" << errno);
71 auto forRead = dup(writeFd_.fd);
73 HMBDC_THROW(std::runtime_error,
"dup failed errno=" << errno);
77 utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
78 utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
82 if (currTransportHeadFlag_ == hasMemoryAttachment::flag) {
84 outputBuffer_.commit(itPending_);
85 currTransportHeadFlag_ = 0;
87 outputBuffer_.putSome(sessionDropped_);
90 void sendSubscribe(
Topic const& t) {
91 if (hmbdc_unlikely(std::regex_match(t.c_str(), topicRegex_))) {
94 auto l = int(t.copyTo(buf + 1));
96 if (hmbdc_unlikely(send(writeFd_.fd, buf, l + 2, MSG_NOSIGNAL) != l + 2)) {
97 HMBDC_LOG_C(
"error when sending subscribe to ",
id());
104 void sendUnsubscribe(
Topic const& t) {
105 if (hmbdc_unlikely(std::regex_match(t.c_str(), topicRegex_))) {
108 auto l = int(t.copyTo(buf + 1));
110 if (hmbdc_unlikely(l + 2 != send(writeFd_.fd, buf, l + 2, MSG_NOSIGNAL))) {
111 HMBDC_LOG_C(
"error when sending unsubscribe to ",
id(),
" errno=", errno);
118 if (!initialized_)
return;
119 char const* hb =
"+\t";
120 if (hmbdc_unlikely(2 != send(writeFd_.fd, hb, 2, MSG_NOSIGNAL))){
121 HMBDC_LOG_C(
"error when sending heartbeat to ",
id(),
" errno=", errno);
126 char const* id()
const {
131 if (hmbdc_unlikely(stopped_))
return false;
132 if (hmbdc_unlikely(!initialized_ && writeFd_.isFdReady())) {
135 }
catch (std::exception
const& e) {
136 HMBDC_LOG_W(e.what());
139 HMBDC_LOG_C(
"unknown exception");
147 void initializeConn() {
148 int flags = fcntl(writeFd_.fd, F_GETFL, 0);
149 flags &= ~O_NONBLOCK;
150 if (fcntl(writeFd_.fd, F_SETFL, flags) < 0) {
151 HMBDC_THROW(std::runtime_error,
"fcntl failed errno=" << errno);
154 auto sz = config_.getExt<
int>(
"tcpRecvBufferBytes");
156 if (setsockopt(readFd_.fd, SOL_SOCKET, SO_RCVBUF, &sz,
sizeof(sz)) < 0) {
157 HMBDC_LOG_C(
"failed to set send buffer size=", sz);
161 auto addrPort = hmbdc::comm::inet::getPeerIpPort(writeFd_.fd);
162 id_ = addrPort.first +
":" + std::to_string(addrPort.second);
163 strncpy(sessionStarted_.payload.ip, id_.c_str()
164 ,
sizeof(sessionStarted_.payload.ip));
165 sessionStarted_.payload.ip[
sizeof(sessionStarted_.payload.ip) - 1] = 0;
166 strncpy(sessionDropped_.payload.ip, id_.c_str()
167 ,
sizeof(sessionDropped_.payload.ip));
168 sessionDropped_.payload.ip[
sizeof(sessionDropped_.payload.ip) - 1] = 0;
169 outputBuffer_.putSome(sessionStarted_);
170 HMBDC_LOG_N(
"RecvSession started: ",
id());
175 void sendSubscriptions() {
177 oss <<
'@' << connKey_ <<
'\t';
178 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
179 auto const& t = it->first;
180 if (std::regex_match(t.c_str(), topicRegex_)) {
188 auto s = oss.str() +
"+\t";
189 auto sz = size_t(send(writeFd_.fd, s.c_str(), s.size(), MSG_NOSIGNAL));
190 if (hmbdc_unlikely(sz != s.size())) {
191 HMBDC_LOG_C(
"error when sending subscriptions to ",
id());
198 if (hmbdc_unlikely(currTransportHeadFlag_ == hasMemoryAttachment::flag
200 auto s = memoryAttachment_.write(bufCur_, filledLen_);
201 if (memoryAttachment_.writeDone()) {
202 memoryAttachment_.close();
203 currTransportHeadFlag_ = 0;
204 outputBuffer_.commit(itPending_);
209 while (currTransportHeadFlag_ == 0
212 auto wireSize = h->wireSize();
213 if (hmbdc_likely(filledLen_ >= wireSize)) {
214 if (hmbdc_likely(subscriptions_.check(h->topic()))) {
216 if (hmbdc_likely(a > 0)) {
217 if (hmbdc_unlikely(h->flag == hasMemoryAttachment::flag
218 && allowRecvMemoryAttachment_.find(h->typeTag()) != allowRecvMemoryAttachment_.end())) {
219 currTransportHeadFlag_ = h->flag;
220 itPending_ = outputBuffer_.claim();
221 char* b =
static_cast<char*
>(*itPending_);
222 auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
223 memcpy(b, h->payload(), l);
226 att.hasMemoryAttachment::attachment = memoryAttachment_.open(att.len);
227 att.hasMemoryAttachment::afterConsumedCleanupFunc = hasMemoryAttachment::free;
228 }
else if (h->flag != hasMemoryAttachment::flag) {
229 auto l = std::min<size_t>(outputBuffer_.maxItemSize(), h->messagePayloadLen);
230 outputBuffer_.put(h->payload(), l);
231 currTransportHeadFlag_ = 0;
233 }
else if (hmbdc_unlikely(a == 0)) {
241 filledLen_ -= wireSize;
246 memmove(buf_, bufCur_, filledLen_);
249 if (readFd_.isFdReady()) {
250 auto l = recv(readFd_.fd, buf_ + filledLen_, bufSize_ - filledLen_
251 , MSG_NOSIGNAL|MSG_DONTWAIT);
252 if (hmbdc_unlikely(l < 0)) {
253 if (hmbdc_unlikely(!readFd_.checkErr())) {
254 HMBDC_LOG_C(
"recv failed errno=", errno);
257 }
else if (hmbdc_unlikely(l == 0)) {
258 HMBDC_LOG_W(
"peer dropped:",
id());
264 }
while (filledLen_);
270 std::regex topicRegex_;
274 OutputBuffer& outputBuffer_;
281 size_t const bufSize_;
285 uint8_t currTransportHeadFlag_;
286 typename OutputBuffer::iterator itPending_;
288 std::unordered_set<uint16_t> allowRecvMemoryAttachment_;
292 template <
typename OutputBuffer,
typename MsgArbitrator>
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: EpollTask.hpp:70
Definition: DownloadMemory.hpp:10
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:225
Definition: Transport.hpp:24
Definition: RecvSession.hpp:29
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:112