hmbdc
simplify-high-performance-messaging-programming
SendServer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/tips/tcpcast/Transport.hpp"
5 #include "hmbdc/tips/tcpcast/Messages.hpp"
6 #include "hmbdc/tips/tcpcast/SendSession.hpp"
7 #include "hmbdc/time/Time.hpp"
8 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
9 #include "hmbdc/comm/inet/Misc.hpp"
10 
11 #include <unordered_set>
12 #include <memory>
13 #include <utility>
14 #include <iostream>
15 
16 #include <netinet/tcp.h>
17 
18 namespace hmbdc { namespace tips { namespace tcpcast {
19 
20 namespace sendserver_detail {
21 using namespace hmbdc::pattern;
22 using namespace hmbdc::app;
23 using namespace std;
24 
25 struct SendServer {
26  SendServer(Config const& cfg
27  , size_t toSendQueueMaxSize
28  , TypeTagSet& outboundSubscriptions)
29  : config_(cfg)
30  , serverFd_(cfg)
31  , serverAddr_(serverFd_.localAddr)
32  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
33  , outboundSubscriptions_(outboundSubscriptions) {
34  if (listen(serverFd_.fd, 10) < 0) {
35  HMBDC_THROW(runtime_error, "failed to listen, errno=" << errno);
36  }
37  utils::EpollTask::instance().add(
38  utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, serverFd_);
39  toSendQueue_.set_capacity(toSendQueueMaxSize);
40  }
41 
42  void advertisingMessages(TypeTagSetST& tts) {
43  decltype(advertisingMessages_) newAds;
44  tts.exportTo([&](uint16_t tag, auto&& subCount) {
45  if (newAds.size() == 0
46  || !newAds.rbegin()->addTypeTag(tag)) {
47  newAds.emplace_back(serverFd_.localIp
48  , serverFd_.localPort
49  , config_.getExt<bool>("loopback"));
50  newAds.rbegin()->addTypeTag(tag);
51  }
52  });
53  std::swap(advertisingMessages_, newAds);
54  for (auto& m : advertisingMessages_) {
55  HMBDC_LOG_N("advertise at ", m);
56  }
57  }
58 
59  auto const& advertisingMessages() const {
60  return advertisingMessages_;
61  }
62 
63  void queue(uint16_t tag
64  , ToSend && toSend
65  , size_t toSendByteSize
67  toSendQueue_.push_back(std::forward_as_tuple(tag, std::forward<ToSend>(toSend), toSendByteSize, it));
68  }
69 
70  /**
71  * @brief run the server's async send function and decide which items in buffer
72  * can be release
73  * @details called all the time
74  *
75  * @param begin if nothing can be released in buffer return this
76  * @param end if all can be releases return this
77  *
78  * @return iterator in buffer poing to new start (not released)
79  */
81  , MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT {
82  doAccept();
83  if (sessions_.size() == 0) {
84  toSendQueue_.clear();
85  return end;
86  }
87  //nothing to retire by default
88  auto newStartIt = begin;
89 
90  auto minIndex = toSendQueue_.size();
91  for (auto it = sessions_.begin(); it != sessions_.end();) {
92  if (minIndex > (*it)->toSendQueueIndex_) {
93  minIndex = (*it)->toSendQueueIndex_;
94  }
95  if (hmbdc_unlikely(!(*it)->runOnce())) {
96  sessions_.erase(it++);
97  } else {
98  it++;
99  }
100  }
101  if (minIndex) {
102  newStartIt = get<3>(toSendQueue_[minIndex - 1]);
103  toSendQueue_.erase_begin(minIndex);
104  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
105  (*it)->toSendQueueIndex_ -= minIndex;
106  }
107  }
108 
109  return newStartIt;
110  }
111 
112  size_t readySessionCount() const {
113  size_t res = 0;
114  for (auto const& s : sessions_) {
115  if (s->ready()) res++;
116  }
117  return res;
118  }
119 
120  void killSlowestSession() {
121  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
122  if (0 == (*it)->toSendQueueIndex_) {
123  HMBDC_LOG_C((*it)->id(), " too slow, dropping");
124  sessions_.erase(it);
125  break;
126  }
127  }
128  }
129 
130 private:
131  void doAccept() HMBDC_RESTRICT {
132  if (hmbdc_unlikely(serverFd_.isFdReady())) {
133  auto addrlen = sizeof(serverAddr_);
134  auto conn = accept(serverFd_.fd, (struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
135  if (conn == -1) {
136  if (!serverFd_.checkErr()) {
137  HMBDC_LOG_C("accept failure, errno=", errno);
138  }
139  return;
140  }
141  auto sz = config_.getExt<int>("tcpSendBufferBytes");
142  if (sz) {
143  if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)) < 0) {
144  HMBDC_LOG_C("failed to set send buffer size=", sz, " errno=", errno);
145  }
146  }
147  int flag = config_.getExt<bool>("nagling")?0:1;
148  if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)) < 0) {
149  HMBDC_LOG_C("failed to set TCP_NODELAY, errno=", errno);
150  }
151  try {
152  auto s = std::make_shared<SendSession>(
153  conn, toSendQueue_, maxSendBatch_, outboundSubscriptions_);
154  sessions_.insert(s);
155  } catch (std::exception const& e) {
156  HMBDC_LOG_C(e.what());
157  }
158  }
159  }
160 
161  Config const& config_;
162  using Sessions = unordered_set<SendSession::ptr>;
163  Sessions sessions_;
164  ToSendQueue toSendQueue_;
165  EpollFd serverFd_;
166  sockaddr_in& serverAddr_;
167  mutable std::vector<TypeTagSource> advertisingMessages_;
168 
169  size_t maxSendBatch_;
170  TypeTagSet& outboundSubscriptions_;
171 };
172 } //sendserver_detail
173 using SendServer = sendserver_detail::SendServer;
174 }}}
MonoLockFreeBuffer::iterator runOnce(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server&#39;s async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:80
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: TypedString.hpp:84
Definition: BlockingBuffer.hpp:11
Definition: TypeTagSet.hpp:141
Definition: TypeTagSet.hpp:201
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89