1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/app/tcpcast/Transport.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/time/Time.hpp" 7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 8 #include "hmbdc/comm/inet/Misc.hpp" 10 #include "hmbdc/app/tcpcast/SendSession.hpp" 12 #include <unordered_set> 17 #include <netinet/tcp.h> 19 namespace hmbdc {
namespace app {
namespace tcpcast {
21 namespace sendserver_detail {
27 ,
size_t toSendQueueMaxSize)
30 , serverAddr_(serverFd_.localAddr)
31 , advertisingMessage_(
32 cfg.
getExt<
string>(
"topicRegex")
35 , config_.getExt<
bool>(
"loopback")
38 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch"))
39 , connKeyCheck_(config_.getExt<
bool>(
"connKeyCheck")) {
40 if (listen(serverFd_.fd, 10) < 0) {
41 HMBDC_THROW(runtime_error,
"failed to listen, errno=" << errno);
43 HMBDC_LOG_N(
"listen at ", advertisingMessage_);
44 utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, serverFd_);
45 toSendQueue_.set_capacity(toSendQueueMaxSize);
49 advertisingMessage_.connKey = connKeyCheck_
50 ?distribution_(generator_)
51 :std::numeric_limits<decltype(advertisingMessage_.connKey)>::max();
52 return advertisingMessage_;
55 void queue(pair<char const*, char const*>
const& t
57 ,
size_t toSendByteSize
59 toSendQueue_.push_back(forward_as_tuple(t, forward<ToSend>(toSend), toSendByteSize, it));
75 if (sessions_.size() == 0) {
80 auto newStartIt = begin;
82 auto minIndex = toSendQueue_.size();
83 for (
auto it = sessions_.begin(); it != sessions_.end();) {
84 if (minIndex > (*it)->toSendQueueIndex_) {
85 minIndex = (*it)->toSendQueueIndex_;
87 if (hmbdc_unlikely(!(*it)->runOnce())) {
88 sessions_.erase(it++);
94 newStartIt = get<3>(toSendQueue_[minIndex - 1]);
95 toSendQueue_.erase_begin(minIndex);
96 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
97 (*it)->toSendQueueIndex_ -= minIndex;
104 size_t readySessionCount()
const {
106 for (
auto const& s : sessions_) {
107 if (s->ready()) res++;
112 void killSlowestSession() {
113 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
114 if (0 == (*it)->toSendQueueIndex_) {
115 HMBDC_LOG_C((*it)->id(),
" too slow, dropping");
123 void doAccept() HMBDC_RESTRICT {
124 if (hmbdc_unlikely(serverFd_.isFdReady())) {
125 auto addrlen =
sizeof(serverAddr_);
126 auto conn = accept(serverFd_.fd, (
struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
128 if (!serverFd_.checkErr()) {
129 HMBDC_LOG_C(
"accept failure, errno=", errno);
133 auto sz = config_.getExt<
int>(
"tcpSendBufferBytes");
135 if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz,
sizeof(sz)) < 0) {
136 HMBDC_LOG_C(
"failed to set send buffer size=", sz,
" errno=", errno);
139 int flag = config_.getExt<
bool>(
"nagling")?0:1;
140 if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (
char*) &flag,
sizeof(flag)) < 0) {
141 HMBDC_LOG_C(
"failed to set TCP_NODELAY, errno=", errno);
144 auto k = advertisingMessage_.connKey;
145 auto s = std::make_shared<SendSession>(conn, k, toSendQueue_, maxSendBatch_);
147 }
catch (std::exception
const& e) {
148 HMBDC_LOG_C(e.what());
154 using Sessions = unordered_set<SendSession::ptr>;
156 ToSendQueue toSendQueue_;
158 sockaddr_in& serverAddr_;
160 mutable std::default_random_engine generator_;
161 mutable std::uniform_int_distribution<uint64_t> distribution_;
163 size_t maxSendBatch_;
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:226
class to hold an hmbdc configuration
Definition: Config.hpp:46
Definition: SendServer.hpp:25
Definition: BlockingBuffer.hpp:10
MonoLockFreeBuffer::iterator runOnce(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server's async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:72
Definition: Transport.hpp:24
Definition: Messages.hpp:83
Definition: LockFreeBufferMisc.hpp:89