hmbdc
simplify-high-performance-messaging-programming
SendServer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
8 #include "hmbdc/comm/inet/Misc.hpp"
9 
10 #include "hmbdc/app/tcpcast/SendSession.hpp"
11 
12 #include <unordered_set>
13 #include <memory>
14 #include <utility>
15 #include <iostream>
16 
17 #include <netinet/tcp.h>
18 
19 namespace hmbdc { namespace app { namespace tcpcast {
20 
21 namespace sendserver_detail {
22 using namespace hmbdc::pattern;
23 using namespace std;
24 
25 struct SendServer {
26  SendServer(Config const& cfg
27  , size_t toSendQueueMaxSize)
28  : config_(cfg)
29  , serverFd_(cfg)
30  , serverAddr_(serverFd_.localAddr)
31  , advertisingMessage_(
32  cfg.getExt<string>("topicRegex")
33  , serverFd_.localIp
34  , serverFd_.localPort
35  , config_.getExt<bool>("loopback")
36  )
37  , distribution_(1)
38  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
39  , connKeyCheck_(config_.getExt<bool>("connKeyCheck")) {
40  if (listen(serverFd_.fd, 10) < 0) {
41  HMBDC_THROW(runtime_error, "failed to listen, errno=" << errno);
42  }
43  HMBDC_LOG_N("listen at ", advertisingMessage_);
44  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, serverFd_);
45  toSendQueue_.set_capacity(toSendQueueMaxSize);
46  }
47 
48  TopicSource const& advertisingMessage() const {
49  advertisingMessage_.connKey = connKeyCheck_
50  ?distribution_(generator_)
51  :std::numeric_limits<decltype(advertisingMessage_.connKey)>::max();
52  return advertisingMessage_;
53  }
54 
55  void queue(pair<char const*, char const*> const& t
56  , ToSend && toSend
57  , size_t toSendByteSize
59  toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
60  }
61 
62  /**
63  * @brief run the server's async send function and decide which items in buffer
64  * can be release
65  * @details called all the time
66  *
67  * @param begin if nothing can be released in buffer return this
68  * @param end if all can be releases return this
69  *
70  * @return iterator in buffer poing to new start (not released)
71  */
73  , MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT {
74  doAccept();
75  if (sessions_.size() == 0) {
76  toSendQueue_.clear();
77  return end;
78  }
79  //nothing to retire by default
80  auto newStartIt = begin;
81 
82  auto minIndex = toSendQueue_.size();
83  for (auto it = sessions_.begin(); it != sessions_.end();) {
84  if (minIndex > (*it)->toSendQueueIndex_) {
85  minIndex = (*it)->toSendQueueIndex_;
86  }
87  if (hmbdc_unlikely(!(*it)->runOnce())) {
88  sessions_.erase(it++);
89  } else {
90  it++;
91  }
92  }
93  if (minIndex) {
94  newStartIt = get<3>(toSendQueue_[minIndex - 1]);
95  toSendQueue_.erase_begin(minIndex);
96  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
97  (*it)->toSendQueueIndex_ -= minIndex;
98  }
99  }
100 
101  return newStartIt;
102  }
103 
104  size_t readySessionCount() const {
105  size_t res = 0;
106  for (auto const& s : sessions_) {
107  if (s->ready()) res++;
108  }
109  return res;
110  }
111 
112  void killSlowestSession() {
113  for (auto it = sessions_.begin(); it != sessions_.end(); ++it) {
114  if (0 == (*it)->toSendQueueIndex_) {
115  HMBDC_LOG_C((*it)->id(), " too slow, dropping");
116  sessions_.erase(it);
117  break;
118  }
119  }
120  }
121 
122 private:
123  void doAccept() HMBDC_RESTRICT {
124  if (hmbdc_unlikely(serverFd_.isFdReady())) {
125  auto addrlen = sizeof(serverAddr_);
126  auto conn = accept(serverFd_.fd, (struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
127  if (conn == -1) {
128  if (!serverFd_.checkErr()) {
129  HMBDC_LOG_C("accept failure, errno=", errno);
130  }
131  return;
132  }
133  auto sz = config_.getExt<int>("tcpSendBufferBytes");
134  if (sz) {
135  if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)) < 0) {
136  HMBDC_LOG_C("failed to set send buffer size=", sz, " errno=", errno);
137  }
138  }
139  int flag = config_.getExt<bool>("nagling")?0:1;
140  if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)) < 0) {
141  HMBDC_LOG_C("failed to set TCP_NODELAY, errno=", errno);
142  }
143  try {
144  auto k = advertisingMessage_.connKey;
145  auto s = std::make_shared<SendSession>(conn, k, toSendQueue_, maxSendBatch_);
146  sessions_.insert(s);
147  } catch (std::exception const& e) {
148  HMBDC_LOG_C(e.what());
149  }
150  }
151  }
152 
153  Config const& config_;
154  using Sessions = unordered_set<SendSession::ptr>;
155  Sessions sessions_;
156  ToSendQueue toSendQueue_;
157  EpollFd serverFd_;
158  sockaddr_in& serverAddr_;
159  mutable TopicSource advertisingMessage_;
160  mutable std::default_random_engine generator_;
161  mutable std::uniform_int_distribution<uint64_t> distribution_;
162 
163  size_t maxSendBatch_;
164  bool connKeyCheck_;
165 };
166 } //sendserver_detail
168 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:226
class to hold an hmbdc configuration
Definition: Config.hpp:46
Definition: Topic.hpp:44
Definition: BlockingBuffer.hpp:10
MonoLockFreeBuffer::iterator runOnce(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server&#39;s async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:72
Definition: Transport.hpp:24
Definition: Messages.hpp:83
Definition: Base.hpp:13
Definition: LockFreeBufferMisc.hpp:89