1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Base.hpp" 4 #include "hmbdc/tips/rmcast/Transport.hpp" 5 #include "hmbdc/tips/rmcast/Messages.hpp" 6 #include "hmbdc/tips/udpcast/Transport.hpp" 7 #include "hmbdc/app/utils/EpollTask.hpp" 8 #include "hmbdc/comm/inet/Endpoint.hpp" 9 #include "hmbdc/time/Time.hpp" 10 #include "hmbdc/time/Rater.hpp" 11 #include "hmbdc/pattern/LockFreeBufferT.hpp" 14 #include <boost/bind.hpp> 19 namespace hmbdc {
namespace tips {
namespace rmcast {
21 namespace mcsendtransport_detail {
31 using ptr = std::shared_ptr<McSendTransport>;
33 ,
size_t maxMessageSize
36 , ToCleanupAttQueue& toCleanupAttQueue)
38 , maxMessageSize_(maxMessageSize)
43 , config_.getExt<uint16_t>(
"mcastPort")).v)
46 , maxSendBatch_(config_.getExt<
size_t>(
"maxSendBatch"))
48 , seqAlertPending_(
false)
50 , startSending_(
false)
51 , toCleanupAttQueue_(toCleanupAttQueue) {
52 toSend_.msg_name = &mcAddr_;
53 toSend_.msg_namelen =
sizeof(mcAddr_);
55 if (maxMessageSize_ + totalHead > mtu_) {
56 HMBDC_THROW(std::out_of_range,
"maxMessageSize need <= " << mtu_ - totalHead);
59 auto addr = seqAlertBuf_;
63 h->setSeq(std::numeric_limits<HMBDC_SEQ_TYPE>::max());
64 seqAlert_ = &(h->wrapped<
SeqAlert>());
66 toSendMsgs_.reserve((maxSendBatch_ + 2) * 2);
68 auto ttl = config_.getExt<
int>(
"ttl");
69 if (ttl > 0 && setsockopt(mcFd_.fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl,
sizeof(ttl)) < 0) {
70 HMBDC_THROW(runtime_error,
"failed to set set ttl=" << ttl <<
" errno=" << errno);
72 char loopch = config_.getExt<
bool>(
"loopback")?1:0;
73 if (setsockopt(mcFd_.fd, IPPROTO_IP, IP_MULTICAST_LOOP, (
char *)&loopch,
sizeof(loopch)) < 0) {
74 HMBDC_THROW(runtime_error,
"failed to set loopback=" << config_.getExt<
bool>(
"loopback"));
77 auto sz = config_.getExt<
int>(
"udpSendBufferBytes");
79 if (setsockopt(mcFd_.fd, SOL_SOCKET, SO_SNDBUF, &sz,
sizeof(sz)) < 0) {
80 HMBDC_LOG_C(
"failed to set send buffer size=", sz);
83 utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, mcFd_);
93 template <
typename AdvertisingMessages>
94 void setAds(AdvertisingMessages
const& ads) {
95 decltype(adBufs_) newAdBufs;
96 for (
auto const& ad : ads) {
97 newAdBufs.emplace_back();
98 auto addr = newAdBufs.rbegin()->data();
103 h->setSeq(std::numeric_limits<HMBDC_SEQ_TYPE>::max());
105 std::swap(adBufs_, newAdBufs);
108 void setAdPending() {
112 void setSeqAlertPending() {
113 seqAlertPending_ =
true;
117 void runOnce(
size_t sessionCount) HMBDC_RESTRICT {
118 resumeSend(sessionCount);
127 size_t maxMessageSize_;
128 Buffer& HMBDC_RESTRICT buffer_;
134 Rater& HMBDC_RESTRICT rater_;
135 size_t maxSendBatch_;
140 bool seqAlertPending_;
143 vector<iovec> toSendMsgs_;
144 ToCleanupAttQueue& toCleanupAttQueue_;
147 resumeSend(
size_t sessionCount) {
149 if (hmbdc_likely(toSendMsgs_.size())) {
150 if (hmbdc_unlikely(!mcFd_.isFdReady()))
return;
152 toSend_.msg_iov = &(toSendMsgs_[0]);
153 toSend_.msg_iovlen = toSendMsgs_.size();
154 if (sendmsg(mcFd_.fd, &toSend_, MSG_DONTWAIT) < 0) {
155 if (!mcFd_.checkErr()) {
156 HMBDC_LOG_C(
"sendmmsg failed errno=", errno);
160 buffer_.wasteAfterPeek(0, wasteSize_);
166 size_t batchBytes = 0;
167 if (hmbdc_unlikely(adPending_)) {
168 for (
auto& adBuf : adBufs_) {
169 toSendMsgs_.push_back(iovec{(
void*)adBuf.data(), adBuf.size()});
170 batchBytes +=
sizeof(adBuf);
175 if (hmbdc_likely(startSending_) && !toSendMsgs_.size()) {
177 buffer_.peek(0, begin, end, maxSendBatch_);
178 if (hmbdc_unlikely(!sessionCount)) {
179 buffer_.wasteAfterPeek(0u, end - begin);
186 if (hmbdc_unlikely(!rater_.check(item->wireSize())))
break;
187 batchBytes += item->wireSize();
188 if (hmbdc_unlikely(batchBytes > mtu_))
break;
189 if (hmbdc_unlikely(item->typeTag() == MemorySeg::typeTag)) {
190 toSendMsgs_.push_back(iovec{(
void*)item->wireBytes(), item->wireSize() - item->wireSizeMemorySeg()});
191 toSendMsgs_.push_back(iovec{(
void*)item->wireBytesMemorySeg(), item->wireSizeMemorySeg()});
193 if (hmbdc_unlikely(item->typeTag() == StartMemorySegTrain::typeTag)) {
194 auto& trainHead = item->template wrapped<StartMemorySegTrain>();
195 auto itActual = it; itActual.seq_ += trainHead.segCount + 1;
197 toCleanupAttQueue_.push_back(make_tuple(itActual.seq_
198 , &actual->template wrapped<hasMemoryAttachment>()
199 , trainHead.att.afterConsumedCleanupFunc));
201 toSendMsgs_.push_back(iovec{(
void*)item->wireBytes(), item->wireSize()});
205 seqAlert_->expectSeq = item->getSeq() + 1;
208 wasteSize_ = it - begin;
210 if (hmbdc_unlikely(seqAlertPending_)) {
212 toSendMsgs_.push_back(iovec{seqAlertBuf_,
sizeof(seqAlertBuf_)});
213 batchBytes +=
sizeof(seqAlertBuf_);
215 seqAlertPending_ =
false;
217 }
while (hmbdc_likely(toSendMsgs_.size()));
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: Endpoint.hpp:17
Definition: TypedString.hpp:84
Definition: McSendTransport.hpp:29
Definition: BlockingBuffer.hpp:11
Definition: Message.hpp:212
Definition: Messages.hpp:163
Definition: Transport.hpp:21
Definition: Message.hpp:263
Definition: Transport.hpp:18
Definition: LockFreeBufferMisc.hpp:89