1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/tips/udpcast/Transport.hpp" 4 #include "hmbdc/tips/udpcast/Messages.hpp" 5 #include "hmbdc/tips/udpcast/DefaultUserConfig.hpp" 6 #include "hmbdc/app/Client.hpp" 7 #include "hmbdc/comm/inet/Endpoint.hpp" 8 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 9 #include "hmbdc/time/Time.hpp" 10 #include "hmbdc/time/Rater.hpp" 11 #include "hmbdc/numeric/BitMath.hpp" 18 namespace hmbdc {
namespace tips {
namespace udpcast {
20 namespace sendtransportengine_detail {
21 HMBDC_CLASS_HAS_DECLARE(hmbdc_net_queued_ts);
33 size_t bufferedMessageCount()
const {
34 return buffer_.remainingSize();
37 size_t subscribingPartyDetectedCount(uint16_t tag)
const {
41 template <MessageTupleC Messages,
typename Node>
42 void advertiseFor(
Node const& node, uint16_t mod, uint16_t res) {
46 template <MessageC Message>
47 void queue(Message&& msg) {
49 auto it = buffer_.claim(n);
50 queue(it, std::forward<Message>(msg));
51 buffer_.commit(it, n);
54 template <
typename Message>
55 bool tryQueue(Message&& msg) {
57 auto it = buffer_.tryClaim(n);
59 queue(it, std::forward<Message>(msg));
60 buffer_.commit(it, n);
82 void runOnce(
bool alwaysPoll =
false) HMBDC_RESTRICT {
84 if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
85 utils::EpollTask::instance().poll();
106 size_t maxMessageSize_;
110 size_t maxSendBatch_;
113 size_t toSendMsgsHead_;
114 size_t toSendMsgsTail_;
115 mmsghdr* toSendPkts_;
116 size_t toSendPktsHead_;
117 size_t toSendPktsTail_;
118 std::vector<comm::inet::Endpoint> udpcastDests_;
121 outBufferSizePower2();
123 template<
typename M,
typename ... Messages>
125 using Message =
typename std::decay<M>::type;
126 static_assert(std::is_trivially_destructible<Message>::value,
"cannot send message with dtor");
128 char* addr =
static_cast<char*
>(s);
130 if (hmbdc_likely(
sizeof(Message) <= maxMessageSize_)) {
134 HMBDC_THROW(std::out_of_range
135 ,
"maxMessageSize too small to hold a message when constructing SendTransportEngine");
137 if constexpr (has_hmbdc_net_queued_ts<Message>::value) {
138 h->template wrapped<Message>().hmbdc_net_queued_ts = hmbdc::time::SysTime::now();
140 queue(++it, std::forward<Messages>(msgs)...);
149 ,
Client<SendTransportEngine> {
150 using SendTransport::SendTransport;
151 using SendTransport::hmbdcName;
152 using SendTransport::schedSpec;
167 using Transport::hmbdcName;
171 using SendTransport = sendtransportengine_detail::SendTransport;
172 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
177 namespace hmbdc {
namespace tips {
namespace udpcast {
179 namespace sendtransportengine_detail {
188 ,
size_t maxMessageSize)
189 : Transport((cfg.setAdditionalFallbackConfig(
Config(DefaultUserConfig))
190 , cfg.resetSection(
"tx", false)))
191 , maxMessageSize_(maxMessageSize)
192 , buffer_(maxMessageSize + sizeof(TransportMessageHeader) + sizeof(
MessageHead), outBufferSizePower2())
194 , config_.getExt<size_t>(
"sendBytesPerSec")
195 , config_.getExt<size_t>(
"sendBytesBurst")
196 , config_.getExt<size_t>(
"sendBytesBurst") != 0ul)
197 , maxSendBatch_(config_.getExt<size_t>(
"maxSendBatch"))
198 , toSendMsgs_(new iovec[maxSendBatch_])
201 , toSendPkts_(new mmsghdr[maxSendBatch_])
203 , toSendPktsTail_(0) {
205 char loopch = config_.getExt<
bool>(
"loopback")?1:0;
206 if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, (
char *)&loopch,
sizeof(loopch)) < 0) {
207 HMBDC_THROW(runtime_error,
"failed to set loopback=" << config_.getExt<
bool>(
"loopback"));
210 auto sz = config_.getExt<
int>(
"udpSendBufferBytes");
212 if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz,
sizeof(sz)) < 0) {
213 HMBDC_LOG_C(
"failed to set send buffer size=", sz);
217 auto ttl = config_.getExt<
int>(
"ttl");
218 if (ttl > 0 && setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl,
sizeof(ttl)) < 0) {
219 HMBDC_THROW(runtime_error,
"failed to set set ttl=" << ttl);
222 utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, *
this);
223 auto minHead =
sizeof(
MessageHead) +
sizeof(TransportMessageHeader);
224 if (maxMessageSize_ + minHead > mtu_) {
225 HMBDC_THROW(std::out_of_range,
"maxMessageSize needs <= " << mtu_ - minHead);
227 cfg(udpcastDests_,
"udpcastDests");
228 if (udpcastDests_.size() == 0) {
229 HMBDC_THROW(std::out_of_range,
"empty udpcastDests");
231 memset(toSendMsgs_, 0,
sizeof(iovec) * maxSendBatch_);
232 memset(toSendPkts_, 0,
sizeof(mmsghdr) * maxSendBatch_);
233 std::for_each(toSendPkts_, toSendPkts_ + maxSendBatch_
234 , [
this](mmsghdr& pkt) {
235 pkt.msg_hdr.msg_name = &udpcastDests_.begin()->v;
236 pkt.msg_hdr.msg_namelen =
sizeof(udpcastDests_.begin()->v);
243 delete []toSendPkts_;
244 delete []toSendMsgs_;
257 outBufferSizePower2() {
258 auto res = config_.
getExt<uint16_t>(
"outBufferSizePower2");
262 res =hmbdc::numeric::log2Upper(8ul * 1024ul / (8ul + maxMessageSize_));
263 HMBDC_LOG_N(
"auto set --outBufferSizePower2=", res);
270 resumeSend() HMBDC_RESTRICT {
274 buffer_.wasteAfterPeek(begin_, end_ - begin_);
275 buffer_.peek(it_, end_, maxSendBatch_);
279 size_t packetBytes = 0;
280 while (it_ != end_ && toSendMsgsTail_ < maxSendBatch_) {
282 auto item =
static_cast<TransportMessageHeader*
>(ptr);
283 rateOk = rater_.check(item->wireSize());
284 if (hmbdc_unlikely(!rateOk)) {
287 packetBytes += item->wireSize();
288 if (hmbdc_unlikely(packetBytes > mtu_)) {
289 toSendPkts_[toSendPktsTail_].msg_hdr.msg_iov = toSendMsgs_ + toSendMsgsHead_;
290 toSendPkts_[toSendPktsTail_++].msg_hdr.msg_iovlen
291 = toSendMsgsTail_ - toSendMsgsHead_;
292 toSendMsgsHead_ = toSendMsgsTail_;
293 packetBytes = item->wireSize();
296 toSendMsgs_[toSendMsgsTail_].iov_base = ptr;
297 toSendMsgs_[toSendMsgsTail_++].iov_len = item->wireSize();
303 toSendPkts_[toSendPktsTail_].msg_hdr.msg_iov = toSendMsgs_ + toSendMsgsHead_;
304 toSendPkts_[toSendPktsTail_++].msg_hdr.msg_iovlen
305 = toSendMsgsTail_ - toSendMsgsHead_;
306 toSendMsgsHead_ = toSendMsgsTail_;
309 auto toSendPktsCount = toSendPktsTail_ - toSendPktsHead_;
310 if (toSendPktsCount && isFdReady()) {
312 l = sendmmsg(fd, toSendPkts_ + toSendPktsHead_, toSendPktsCount, MSG_NOSIGNAL|MSG_DONTWAIT);
313 if (hmbdc_unlikely(udpcastDests_.size() > 1)) {
314 for (
auto i = 1u; hmbdc_unlikely(i < udpcastDests_.size()); ++i) {
315 auto& addr = udpcastDests_[i].v;
316 std::for_each(toSendPkts_ + toSendPktsHead_, toSendPkts_ + toSendPktsHead_ + toSendPktsCount
317 , [&addr](mmsghdr& pkt) {
318 pkt.msg_hdr.msg_name = &addr;
321 l = std::max(l, sendmmsg(fd, toSendPkts_ + toSendPktsHead_, toSendPktsCount, MSG_NOSIGNAL|MSG_DONTWAIT));
323 std::for_each(toSendPkts_ + toSendPktsHead_, toSendPkts_ + toSendPktsHead_ + toSendPktsCount
324 , [
this](mmsghdr& pkt) {
325 pkt.msg_hdr.msg_name = &udpcastDests_[0].v;
329 if (hmbdc_likely(l > 0)) {
330 }
else if (hmbdc_likely(l < 0)) {
331 if (hmbdc_unlikely(!checkErr())) {
332 HMBDC_LOG_C(
"sendmmsg failed errno=", errno);
339 toSendPktsHead_ += l;
340 if (toSendPktsHead_ == toSendPktsTail_) {
341 toSendPktsHead_ = toSendPktsTail_ = 0;
342 toSendMsgsHead_ = toSendMsgsTail_ = 0;
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: MonoLockFreeBuffer.hpp:16
Definition: Transport.hpp:39
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:162
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: TypedString.hpp:84
Definition: SendTransportEngine.hpp:147
Definition: SendTransportEngine.hpp:28
Definition: BlockingBuffer.hpp:11
Definition: Message.hpp:212
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:159
Definition: Message.hpp:263
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: LockFreeBufferMisc.hpp:89