1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Logger.hpp" 4 #include "hmbdc/tips/tcpcast/SendServer.hpp" 5 #include "hmbdc/tips/tcpcast/Transport.hpp" 6 #include "hmbdc/tips/tcpcast/Messages.hpp" 7 #include "hmbdc/tips/udpcast/SendTransportEngine.hpp" 8 #include "hmbdc/tips/tcpcast/DefaultUserConfig.hpp" 9 #include "hmbdc//MetaUtils.hpp" 10 #include "hmbdc/time/Time.hpp" 11 #include "hmbdc/time/Rater.hpp" 12 #include "hmbdc/numeric/BitMath.hpp" 14 #include <boost/circular_buffer.hpp> 18 #include <type_traits> 21 namespace hmbdc {
namespace tips {
namespace tcpcast {
23 namespace send_detail {
24 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
28 using pattern::MonoLockFreeBuffer;
42 size_t subscribingPartyDetectedCount(uint16_t tag)
const {
43 return outboundSubscriptions_.check(tag);
46 size_t bufferedMessageCount()
const {
47 return buffer_.remainingSize();
50 template <MessageC Message>
51 void queue(Message&& msg) {
52 if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag())) {
53 using M =
typename std::decay<Message>::type;
54 if constexpr (std::is_base_of<hasMemoryAttachment, M>::value) {
60 auto it = buffer_.claim(n);
61 queue(it, std::forward<Message>(msg));
62 buffer_.commit(it, n);
65 template <MessageC Message>
66 bool tryQueue(Message&& msg) {
67 if (!minRecvToStart_ && !outboundSubscriptions_.check(msg.getTypeTag())) {
68 using M =
typename std::decay<Message>::type;
69 if constexpr (std::is_base_of<hasMemoryAttachment, M>::value) {
75 auto it = buffer_.tryClaim(n);
77 queue(it, std::forward<Message>(msg));
78 buffer_.commit(it, n);
84 template <
typename Message,
typename ... Args>
85 void queueInPlace(Args&&... args) {
86 static_assert(!std::is_base_of<app::JustBytes, Message>::value
87 ,
"use queueJustBytes instead");
88 static_assert(std::is_trivially_destructible<Message>::value
89 ,
"cannot send message with dtor");
90 static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
92 ,
"hasMemoryAttachment has to the first base for Message");
93 auto s = buffer_.claim();
94 char* addr =
static_cast<char*
>(*s);
97 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
100 h->flag = wrap->scratchpad().desc.flag;
102 HMBDC_THROW(std::out_of_range
103 ,
"maxMessageSize too small to hold a message");
108 void queueJustBytes(uint16_t tag,
void const* bytes,
size_t len
110 if (!minRecvToStart_ && !outboundSubscriptions_.check(tag)) {
116 auto s = buffer_.claim();
117 char* addr =
static_cast<char*
>(*s);
120 if (hmbdc_likely(len <= maxMessageSize_)) {
123 h->flag = wrap->scratchpad().desc.flag;
125 HMBDC_THROW(std::out_of_range
126 ,
"maxMessageSize too small to hold a message");
134 size_t maxMessageSize_;
135 size_t minRecvToStart_;
138 unique_ptr<udpcast::SendTransport> mcSendTransport_;
144 template<
typename M,
typename ... Messages>
146 using Message =
typename std::decay<M>::type;
147 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
148 static_assert(!std::is_base_of<hasMemoryAttachment, Message>::value
150 ,
"hasMemoryAttachment has to the first base for Message");
153 char* addr =
static_cast<char*
>(s);
156 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
159 h->flag = wrap->scratchpad().desc.flag;
161 HMBDC_THROW(std::out_of_range,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
163 if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
164 h->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
166 queue(++it, std::forward<Messages>(msgs)...);
176 ,
Client<SendTransportEngine> {
178 using SendTransport::hmbdcName;
179 using SendTransport::schedSpec;
181 template <MessageTupleC Messages,
typename Node>
182 void advertiseFor(
Node const& node, uint16_t mod, uint16_t res) {
183 std::scoped_lock<std::mutex> g(advertisedTypeTags_.lock);
184 advertisedTypeTags_.addPubsFor<Messages>(node, mod, res);
185 server_->advertisingMessages(advertisedTypeTags_);
189 this->checkTimers(time::SysTime::now());
196 mcSendTransport_->runOnce(
true);
212 return server_->readySessionCount();
217 auto seq = buffer_.readSeq();
218 if (seq == lastSeq_ && buffer_.isFull()) {
219 server_->killSlowestSession();
227 size_t maxSendBatch_;
228 bool waitForSlowReceivers_;
229 MonoLockFreeBuffer::iterator begin_;
233 std::optional<SendServer> server_;
239 using SendTransport = send_detail::SendTransport;
240 using SendTransportEngine = send_detail::SendTransportEngine;
243 namespace hmbdc {
namespace tips {
namespace tcpcast {
245 namespace send_detail {
248 using pattern::MonoLockFreeBuffer;
253 ,
size_t maxMessageSize)
254 :
Transport((cfg.setAdditionalFallbackConfig(
Config(DefaultUserConfig))
255 , cfg.resetSection(
"tx", false)))
256 , maxMessageSize_(maxMessageSize)
257 , minRecvToStart_(config_.getExt<size_t>(
"minRecvToStart"))
259 , config_.getExt<uint16_t>(
"outBufferSizePower2")
260 ?config_.getExt<uint16_t>(
"outBufferSizePower2")
261 :
hmbdc::numeric::log2Upper(128ul * 1024ul / maxMessageSize)
264 , config_.getExt<size_t>(
"sendBytesPerSec")
265 , config_.getExt<size_t>(
"sendBytesBurst")
266 , config_.getExt<size_t>(
"sendBytesBurst") != 0ul
268 , mcConfig_(config_) {
269 mcConfig_.
put(
"loopback",
true);
271 mcConfig_.
put(
"outBufferSizePower2", 3u);
285 SendTransportEngine::
286 SendTransportEngine(
Config const& cfg
287 ,
size_t maxMessageSize)
290 , mtu_(config_.getExt<size_t>(
"mtu") - 20 - 20)
291 , maxSendBatch_(config_.getExt<size_t>(
"maxSendBatch"))
292 , waitForSlowReceivers_(config_.getExt<bool>(
"waitForSlowReceivers"))
295 toSend_.reserve(maxSendBatch_ * 2);
297 MonoLockFreeBuffer::iterator end;
298 buffer_.peek(begin_, end, 0);
300 server_.emplace(config_, buffer_.capacity(), outboundSubscriptions_);
303 std::unique_lock<std::mutex> g(advertisedTypeTags_.lock, std::try_to_lock);
304 if (!g.owns_lock())
return;
305 for (
auto& ad : server_->advertisingMessages()) {
306 mcSendTransport_->queue(ad);
308 if (!waitForSlowReceivers_) cullSlow();
312 schedule(SysTime::now(), *
this);
317 SendTransportEngine::
318 runOnce() HMBDC_RESTRICT {
319 MonoLockFreeBuffer::iterator begin, end;
322 if (hmbdc_likely(!minRecvToStart_
323 || server_->readySessionCount() >= minRecvToStart_)) {
325 buffer_.peek(begin, end, maxSendBatch_);
327 buffer_.peek(begin, end, 0);
331 uint16_t currentTypeTag = 0;
332 size_t toSendByteSize = 0;
336 auto item =
static_cast<TransportMessageHeader*
>(ptr);
338 if (hmbdc_unlikely(!rater_.check(item->wireSize())))
break;
339 if (hmbdc_unlikely(!item->flag
340 && toSendByteSize + item->wireSize() > mtu_))
break;
342 if (hmbdc_unlikely(!currentTypeTag)) {
343 currentTypeTag = item->typeTag();
344 toSendByteSize = item->wireSize();
345 toSend_.push_back(iovec{ptr, item->wireSize()});
346 }
else if (item->typeTag() == currentTypeTag) {
347 toSendByteSize += item->wireSize();
348 toSend_.push_back(iovec{ptr, item->wireSize()});
350 server_->queue(currentTypeTag
356 currentTypeTag = item->typeTag();
357 toSendByteSize = item->wireSize();
358 toSend_.push_back(iovec{ptr, item->wireSize()});
361 if (hmbdc_unlikely(item->flag == hasMemoryAttachment::flag)) {
363 toSend_.push_back(iovec{a.attachment, a.len});
364 toSendByteSize += a.len;
369 if (toSend_.size()) {
370 server_->queue(currentTypeTag
378 auto newStart = server_->runOnce(begin, it);
380 for (
auto it = begin; it != newStart; ++it) {
382 auto item =
static_cast<TransportMessageHeader*
>(ptr);
383 if (hmbdc_unlikely(item->flag == hasMemoryAttachment::flag)) {
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:194
Definition: MonoLockFreeBuffer.hpp:16
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:211
SendTransport(Config, size_t)
ctor
Definition: SendTransportEngine.hpp:252
class to hold an hmbdc configuration
Definition: Config.hpp:45
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:200
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
Definition: SendTransportEngine.hpp:28
Definition: Transport.hpp:65
Definition: SendTransportEngine.hpp:172
Definition: Message.hpp:212
Definition: Message.hpp:391
Definition: Message.hpp:263
Config & put(Args &&... args)
forward the call to ptree's put but return Configure
Definition: Config.hpp:195
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
capture the transportation mechanism
Definition: SendTransportEngine.hpp:33
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
void wasteAfterPeek(iterator, size_t, bool=false)
if size not matching - please refer to the impl for details
Definition: MonoLockFreeBuffer.hpp:160
Definition: Timers.hpp:117
Definition: LockFreeBufferMisc.hpp:89