hmbdc
simplify-high-performance-messaging-programming
SendSession.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/tips/tcpcast/Transport.hpp"
5 #include "hmbdc/tips/tcpcast/Messages.hpp"
6 #include "hmbdc/tips/TypeTagSet.hpp"
7 
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
10 #include "hmbdc/comm/inet/Misc.hpp"
11 
12 #include <boost/lexical_cast.hpp>
13 #include <boost/circular_buffer.hpp>
14 
15 #include <random>
16 #include <memory>
17 #include <utility>
18 #include <iostream>
19 #include <fcntl.h>
20 
21 namespace hmbdc { namespace tips { namespace tcpcast {
22 
23 namespace sendserver_detail {
24 struct SendServer;
25 }
26 
27 using ToSend = std::vector<iovec>;
28 using ToSendQueue = boost::circular_buffer<std::tuple<uint16_t //TypeTag
29  , ToSend
30  , size_t //total bytes above
32  >
33 >;
34 
35 namespace sendsession_detail {
36 using namespace std;
37 
38 struct SendSession {
39  using ptr = shared_ptr<SendSession>;
40  SendSession(int fd
41  , ToSendQueue & toSendQueue
42  , size_t maxSendBatch
43  , TypeTagSet& outboundSubscriptions)
44  : readLen_(0)
45  , ready_(false)
46  , toSendQueue_(toSendQueue)
47  , toSendQueueIndex_(0)
48  , msghdr_{0}
49  , msghdrRelic_{0}
50  , msghdrRelicSize_(0)
51  , outboundSubscriptions_(outboundSubscriptions) {
52  msghdrRelic_.msg_iov = new iovec[maxSendBatch * 2]; //double for attachment
53  auto addrPort = hmbdc::comm::inet::getPeerIpPort(fd);
54  id_ = addrPort.first + ":" + std::to_string(addrPort.second);
55  auto forRead = dup(fd);
56  if (forRead == -1) {
57  HMBDC_THROW(std::runtime_error, "dup failed errno=" << errno);
58  }
59 
60  readFd_.fd = forRead;
61  using namespace hmbdc::app;
62  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, readFd_);
63  writeFd_.fd = fd;
64  utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, writeFd_);
65  HMBDC_LOG_N("SendSession started: ", id());
66  }
67 
68  ~SendSession() {
69  clientSubscriptions_.exportTo([this](uint16_t tag, auto&&) {
70  outboundSubscriptions_.erase(tag);
71  });
72  delete [] msghdrRelic_.msg_iov;
73  HMBDC_LOG_N("SendSession retired: ", id());
74  }
75 
76  bool runOnce() HMBDC_RESTRICT {
77  if (hmbdc_unlikely(!doRead())) return false;
78  if (hmbdc_unlikely(writeFd_.isFdReady() && msghdrRelicSize_)) {
79  auto l = sendmsg(writeFd_.fd, &msghdrRelic_, MSG_NOSIGNAL|MSG_DONTWAIT);
80  if (hmbdc_unlikely(l < 0)) {
81  if (!writeFd_.checkErr()) {
82  HMBDC_LOG_C("sendmsg failed errno=", errno);
83  return false;
84  }
85  return true;
86  }
87  msghdrRelicSize_ -= size_t(l);
88  if (hmbdc_unlikely(msghdrRelicSize_)) {
89  comm::inet::extractRelicTo(msghdrRelic_, msghdrRelic_, l);
90  return true;
91  } else {
92  toSendQueueIndex_++;
93  }
94  }
95  for (; !msghdrRelicSize_ && writeFd_.isFdReady()
96  && toSendQueueIndex_ != toSendQueue_.size();) {
97  if (clientSubscriptions_.check(get<0>(toSendQueue_[toSendQueueIndex_]))) {
98  msghdr_.msg_iov = &(get<1>(toSendQueue_[toSendQueueIndex_])[0]);
99  msghdr_.msg_iovlen = get<1>(toSendQueue_[toSendQueueIndex_]).size();
100  auto l = sendmsg(writeFd_.fd, &msghdr_, MSG_NOSIGNAL|MSG_DONTWAIT);
101  if (hmbdc_unlikely(l < 0)) {
102  if (!writeFd_.checkErr()) {
103  HMBDC_LOG_C("sendmsg failed errno=", errno);
104  return false;
105  }
106  return true;
107  }
108  msghdrRelicSize_ = get<2>(toSendQueue_[toSendQueueIndex_]) - size_t(l);
109  if (hmbdc_unlikely(msghdrRelicSize_)) {
110  comm::inet::extractRelicTo(msghdrRelic_, msghdr_, l);
111  return true;
112  }
113  }
114  toSendQueueIndex_++;
115  }
116  return true;
117  }
118 
119  char const* id() const {
120  return id_.c_str();
121  }
122 
123  bool ready() const {
124  return ready_;
125  }
126 
127 private:
128  bool
129  doRead() HMBDC_RESTRICT {
130  if (hmbdc_unlikely(readFd_.isFdReady())) {
131  auto l = recv(readFd_.fd, data_ + readLen_, sizeof(data_) - readLen_, MSG_NOSIGNAL|MSG_DONTWAIT);
132  if (hmbdc_unlikely(l < 0)) {
133  if (!readFd_.checkErr()) {
134  HMBDC_LOG_C("recv failed errno=", errno);
135  return false;
136  }
137  } else {
138  readLen_ += l;
139  }
140  };
141  while (readLen_) {
142  auto p = find(data_, data_ + readLen_, '\t');
143  if (p != data_ + readLen_) {
144  *p = '\000';
145  string t(data_ + 1);
146  if (t.size() == 0) {
147  ready_ = true;
148  } else if (data_[0] == '+') {
149  auto tag = (uint16_t)std::stoi(t);
150  if (clientSubscriptions_.set(tag)) {
151  outboundSubscriptions_.add(tag);
152  }
153  } else if (data_[0] == '-') {
154  auto tag = (uint16_t)std::stoi(t);
155  if (clientSubscriptions_.unset(tag)) {
156  outboundSubscriptions_.erase(tag);
157  }
158  } else {
159  HMBDC_LOG_C("voilating protocle by ", id_);
160  return false;
161  }
162  memmove(data_, p + 1, readLen_ - (p - data_ + 1));
163  readLen_ -= p - data_ + 1;
164  } else {
165  break; //no complete type tag request
166  }
167  }
168  return true;
169  }
170  app::utils::EpollFd writeFd_;
171  app::utils::EpollFd readFd_;
172  char data_[16 * 1024];
173  size_t readLen_;
174  string id_;
175  bool ready_;
176 
178  ToSendQueue& toSendQueue_;
179  ToSendQueue::size_type toSendQueueIndex_;
180  msghdr msghdr_;
181  msghdr msghdrRelic_;
182  size_t msghdrRelicSize_;
183 
184  TypeTagSet clientSubscriptions_;
185  TypeTagSet& outboundSubscriptions_;
186 };
187 } //sendsession_detail
188 
190 
191 }}}
Definition: Base.hpp:12
Definition: TypedString.hpp:84
Definition: EpollTask.hpp:87
Definition: TypeTagSet.hpp:141
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89