1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/tips/tcpcast/Transport.hpp" 5 #include "hmbdc/tips/tcpcast/Messages.hpp" 6 #include "hmbdc/tips/tcpcast/SendSession.hpp" 7 #include "hmbdc/time/Time.hpp" 8 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 9 #include "hmbdc/comm/inet/Misc.hpp" 11 #include <unordered_set> 16 #include <netinet/tcp.h> 18 namespace hmbdc {
namespace tips {
namespace tcpcast {
20 namespace sendserver_detail {
27 ,
size_t toSendQueueMaxSize
31 , serverAddr_(serverFd_.localAddr)
32 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch"))
33 , outboundSubscriptions_(outboundSubscriptions) {
34 if (listen(serverFd_.fd, 10) < 0) {
35 HMBDC_THROW(runtime_error,
"failed to listen, errno=" << errno);
37 utils::EpollTask::instance().add(
38 utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, serverFd_);
39 toSendQueue_.set_capacity(toSendQueueMaxSize);
43 decltype(advertisingMessages_) newAds;
44 tts.exportTo([&](uint16_t tag,
auto&& subCount) {
45 if (newAds.size() == 0
46 || !newAds.rbegin()->addTypeTag(tag)) {
47 newAds.emplace_back(serverFd_.localIp
49 , config_.getExt<
bool>(
"loopback"));
50 newAds.rbegin()->addTypeTag(tag);
53 std::swap(advertisingMessages_, newAds);
54 for (
auto& m : advertisingMessages_) {
55 HMBDC_LOG_N(
"advertise at ", m);
59 auto const& advertisingMessages()
const {
60 return advertisingMessages_;
63 void queue(uint16_t tag
65 ,
size_t toSendByteSize
67 toSendQueue_.push_back(std::forward_as_tuple(tag, std::forward<ToSend>(toSend), toSendByteSize, it));
83 if (sessions_.size() == 0) {
88 auto newStartIt = begin;
90 auto minIndex = toSendQueue_.size();
91 for (
auto it = sessions_.begin(); it != sessions_.end();) {
92 if (minIndex > (*it)->toSendQueueIndex_) {
93 minIndex = (*it)->toSendQueueIndex_;
95 if (hmbdc_unlikely(!(*it)->runOnce())) {
96 sessions_.erase(it++);
102 newStartIt = get<3>(toSendQueue_[minIndex - 1]);
103 toSendQueue_.erase_begin(minIndex);
104 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
105 (*it)->toSendQueueIndex_ -= minIndex;
112 size_t readySessionCount()
const {
114 for (
auto const& s : sessions_) {
115 if (s->ready()) res++;
120 void killSlowestSession() {
121 for (
auto it = sessions_.begin(); it != sessions_.end(); ++it) {
122 if (0 == (*it)->toSendQueueIndex_) {
123 HMBDC_LOG_C((*it)->id(),
" too slow, dropping");
131 void doAccept() HMBDC_RESTRICT {
132 if (hmbdc_unlikely(serverFd_.isFdReady())) {
133 auto addrlen =
sizeof(serverAddr_);
134 auto conn = accept(serverFd_.fd, (
struct sockaddr *)&serverAddr_, (socklen_t*)&addrlen);
136 if (!serverFd_.checkErr()) {
137 HMBDC_LOG_C(
"accept failure, errno=", errno);
141 auto sz = config_.getExt<
int>(
"tcpSendBufferBytes");
143 if (setsockopt(conn, SOL_SOCKET, SO_SNDBUF, &sz,
sizeof(sz)) < 0) {
144 HMBDC_LOG_C(
"failed to set send buffer size=", sz,
" errno=", errno);
147 int flag = config_.getExt<
bool>(
"nagling")?0:1;
148 if (setsockopt(conn, IPPROTO_TCP, TCP_NODELAY, (
char*) &flag,
sizeof(flag)) < 0) {
149 HMBDC_LOG_C(
"failed to set TCP_NODELAY, errno=", errno);
152 auto s = std::make_shared<SendSession>(
153 conn, toSendQueue_, maxSendBatch_, outboundSubscriptions_);
155 }
catch (std::exception
const& e) {
156 HMBDC_LOG_C(e.what());
162 using Sessions = unordered_set<SendSession::ptr>;
164 ToSendQueue toSendQueue_;
166 sockaddr_in& serverAddr_;
167 mutable std::vector<TypeTagSource> advertisingMessages_;
169 size_t maxSendBatch_;
170 TypeTagSet& outboundSubscriptions_;
173 using SendServer = sendserver_detail::SendServer;
MonoLockFreeBuffer::iterator runOnce(MonoLockFreeBuffer::iterator begin, MonoLockFreeBuffer::iterator end) HMBDC_RESTRICT
run the server's async send function and decide which items in buffer can be release ...
Definition: SendServer.hpp:80
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: SendServer.hpp:25
Definition: TypedString.hpp:84
Definition: BlockingBuffer.hpp:11
Definition: LockFreeBufferMisc.hpp:89