hmbdc
simplify-high-performance-messaging-programming
 All Classes Namespaces Functions Variables Friends Pages
SendSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 
7 #include "hmbdc/text/StringTrieSet.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
10 #include "hmbdc/comm/inet/Misc.hpp"
11 
12 #include <boost/lexical_cast.hpp>
13 #include <boost/circular_buffer.hpp>
14 
15 #include <random>
16 #include <memory>
17 #include <utility>
18 #include <iostream>
19 #include <fcntl.h>
20 
21 namespace hmbdc { namespace app { namespace tcpcast {
22 
23 namespace sendserver_detail {
24 struct SendServer;
25 }
26 
27 using ToSend = std::vector<iovec>;
28 using ToSendQueue = boost::circular_buffer<std::tuple<std::pair<char const*, char const*> //topic
29  , ToSend
30  , size_t //total bytes above
32  >
33 >;
34 
35 namespace sendsession_detail {
36 using namespace std;
37 
38 struct SendSession {
39  using ptr = shared_ptr<SendSession>;
40  SendSession(int fd
41  , uint64_t connKey
42  , ToSendQueue & toSendQueue
43  , size_t maxSendBatch)
44  : connKey_(connKey)
45  , readLen_(0)
46  , ready_(false)
47  , toSendQueue_(toSendQueue)
48  , toSendQueueIndex_(0)
49  , msghdr_{0}
50  , msghdrRelic_{0}
51  , msghdrRelicSize_(0) {
52  msghdrRelic_.msg_iov = new iovec[maxSendBatch * 2]; //double for attachment
53  auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
54  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
55  auto forRead = dup(fd);
56  if (forRead == -1) {
57  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
58  }
59 
60  readFd_.fd = forRead;
61  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
62  writeFd_.fd = fd;
63  utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
64  HMBDC_LOG_N("SendSession started: ", id());
65  }
66 
67  ~SendSession() {
68  delete [] msghdrRelic_.msg_iov;
69  HMBDC_LOG_N("SendSession retired: ", id());
70  }
71 
72  bool runOnce() HMBDC_RESTRICT {
73  if (hmbdc_unlikely(!doRead())) return false;
74  if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
75  auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
76  if (hmbdc_unlikely(l < 0)) {
77  if (!writeFd_.checkErr()) {
78  HMBDC_LOG_C("sendmsg failed errno=", errno);
79  return false;
80  }
81  return true;
82  }
83  msghdrRelicSize_ -= size_t(l);
84  if (hmbdc_unlikely(msghdrRelicSize_)) {
85  comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
86  return true;
87  } else {
88  toSendQueueIndex_++;
89  }
90  }
91  for (; !msghdrRelicSize_ && writeFd_.isFdReady()
92  && toSendQueueIndex_ != toSendQueue_.size();) {
93  if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
94  msghdr_.msg_iov = &(get<1>(toSendQueue_[toSendQueueIndex_])[0]);
95  msghdr_.msg_iovlen = get<1>(toSendQueue_[toSendQueueIndex_]).size();
96  auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
97  if (hmbdc_unlikely(l < 0)) {
98  if (!writeFd_.checkErr()) {
99  HMBDC_LOG_C("sendmsg failed errno=", errno);
100  return false;
101  }
102  return true;
103  }
104  msghdrRelicSize_ = get<2>(toSendQueue_[toSendQueueIndex_]) - size_t(l);
105  if (hmbdc_unlikely(msghdrRelicSize_)) {
106  comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
107  return true;
108  }
109  }
110  toSendQueueIndex_++;
111  }
112  return true;
113  }
114 
115  char const* id() const {
116  return id_.c_str();
117  }
118 
119  bool ready() const {
120  return ready_;
121  }
122 
123 private:
124  bool
125  doRead() HMBDC_RESTRICT {
126  if (hmbdc_unlikely(readFd_.isFdReady())) {
127  auto l = recv(readFd_.fd, data_ + readLen_, sizeof(data_) - readLen_, MSG_NOSIGNAL|MSG_DONTWAIT);
128  if (hmbdc_unlikely(l < 0)) {
129  if (!readFd_.checkErr()) {
130  HMBDC_LOG_C("recv failed errno=", errno);
131  return false;
132  }
133  } else {
134  readLen_ += l;
135  }
136  };
137  while (readLen_) {
138  auto p = find(data_, data_ + readLen_, '\t');
139  if (p != data_ + readLen_) {
140  *p = '\000';
141  string t(data_ + 1);
142  if (hmbdc_unlikely(connKey_)) {
143  if (data_[0] != '@' || stoull(t) != connKey_) {
144  HMBDC_LOG_C("invalid connection attempted by ", id_);
145  return false;
146  }
147  connKey_ = 0;
148  } else if (t.size() == 0) {
149  ready_ = true;
150  }
151  else if (data_[0] == '+') {
152  clientSubscriptions_.add(t);
153  } else if (data_[0] == '-') {
154  clientSubscriptions_.erase(t);
155  } else {
156  HMBDC_LOG_C("voilating protocle by ", id_);
157  return false;
158  }
159  memmove(data_, p + 1, readLen_ - (p - data_ + 1));
160  readLen_ -= p - data_ + 1;
161  } else {
162  break; //no complete topic request
163  }
164  }
165  return true;
166  }
167  uint64_t connKey_;
168  utils::EpollFd writeFd_;
169  utils::EpollFd readFd_;
170  char data_[16 * 1024];
171  size_t readLen_;
172  string id_;
173  bool ready_;
174 
176  ToSendQueue& toSendQueue_;
177  ToSendQueue::size_type toSendQueueIndex_;
178  msghdr msghdr_;
179  msghdr msghdrRelic_;
180  size_t msghdrRelicSize_;
181 
182  text::StringTrieSet clientSubscriptions_;
183 };
184 } //sendsession_detail
185 
187 
188 }}}
Definition: EpollTask.hpp:70
Definition: StringTrieSetDetail.hpp:115
Definition: LockFreeBufferMisc.hpp:89