hmbdc
simplify-high-performance-messaging-programming
McSendTransport.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Base.hpp"
4 #include "hmbdc/tips/rmcast/Transport.hpp"
5 #include "hmbdc/tips/rmcast/Messages.hpp"
6 #include "hmbdc/tips/udpcast/Transport.hpp"
7 #include "hmbdc/app/utils/EpollTask.hpp"
8 #include "hmbdc/comm/inet/Endpoint.hpp"
9 #include "hmbdc/time/Time.hpp"
10 #include "hmbdc/time/Rater.hpp"
11 #include "hmbdc/pattern/LockFreeBufferT.hpp"
12 
13 
14 #include <boost/bind.hpp>
15 #include <memory>
16 
17 #include <iostream>
18 
19 namespace hmbdc { namespace tips { namespace rmcast {
20 
21 namespace mcsendtransport_detail {
22 using namespace std;
23 using namespace hmbdc::app;
24 using namespace hmbdc::time;
25 using namespace hmbdc::pattern;
26 
28 
30 : Transport {
31  using ptr = std::shared_ptr<McSendTransport>;
32  McSendTransport(Config const& cfg
33  , size_t maxMessageSize
34  , Buffer& buffer
35  , Rater& rater
36  , ToCleanupAttQueue& toCleanupAttQueue)
37  : Transport(cfg)
38  , maxMessageSize_(maxMessageSize)
39  , buffer_(buffer)
40  , wasteSize_(0)
41  , mcFd_(cfg)
42  , mcAddr_(comm::inet::Endpoint(config_.getExt<std::string>("mcastAddr")
43  , config_.getExt<uint16_t>("mcastPort")).v)
44  , toSend_{0}
45  , rater_(rater)
46  , maxSendBatch_(config_.getExt<size_t>("maxSendBatch"))
47  , adPending_(false)
48  , seqAlertPending_(false)
49  , seqAlert_(nullptr)
50  , startSending_(false)
51  , toCleanupAttQueue_(toCleanupAttQueue) {
52  toSend_.msg_name = &mcAddr_;
53  toSend_.msg_namelen = sizeof(mcAddr_);
54  auto totalHead = sizeof(app::MessageHead) + sizeof(TransportMessageHeader);
55  if (maxMessageSize_ + totalHead > mtu_) {
56  HMBDC_THROW(std::out_of_range, "maxMessageSize need <= " << mtu_ - totalHead);
57  }
58 
59  auto addr = seqAlertBuf_;
60  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
61  new (addr + sizeof(TransportMessageHeader)) MessageWrap<SeqAlert>();
62  h->messagePayloadLen = sizeof(MessageWrap<SeqAlert>);
63  h->setSeq(std::numeric_limits<HMBDC_SEQ_TYPE>::max());
64  seqAlert_ = &(h->wrapped<SeqAlert>());
65 
66  toSendMsgs_.reserve((maxSendBatch_ + 2) * 2); //double for MemorySeg cases
67 
68  auto ttl = config_.getExt<int>("ttl");
69  if (ttl > 0 && setsockopt(mcFd_.fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0) {
70  HMBDC_THROW(runtime_error, "failed to set set ttl=" << ttl << " errno=" << errno);
71  }
72  char loopch = config_.getExt<bool>("loopback")?1:0;
73  if (setsockopt(mcFd_.fd, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch)) < 0) {
74  HMBDC_THROW(runtime_error, "failed to set loopback=" << config_.getExt<bool>("loopback"));
75  }
76 
77  auto sz = config_.getExt<int>("udpSendBufferBytes");
78  if (sz) {
79  if (setsockopt(mcFd_.fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)) < 0) {
80  HMBDC_LOG_C("failed to set send buffer size=", sz);
81  }
82  }
83  utils::EpollTask::instance().add(utils::EpollTask::EPOLLOUT|utils::EpollTask::EPOLLET, mcFd_);
84  }
85 
86  ~McSendTransport(){
87  }
88 
89  void startSend(){
90  startSending_ = true;
91  }
92 
93  template <typename AdvertisingMessages>
94  void setAds(AdvertisingMessages const& ads) {
95  decltype(adBufs_) newAdBufs;
96  for (auto const& ad : ads) {
97  newAdBufs.emplace_back();
98  auto addr = newAdBufs.rbegin()->data();
99  auto h = new (addr) TransportMessageHeader;
100  new (addr + sizeof(TransportMessageHeader))
102  h->messagePayloadLen = sizeof(MessageWrap<TypeTagBackupSource>);
103  h->setSeq(std::numeric_limits<HMBDC_SEQ_TYPE>::max());
104  }
105  std::swap(adBufs_, newAdBufs);
106  }
107 
108  void setAdPending() {
109  adPending_ = true;
110  }
111 
112  void setSeqAlertPending() {
113  seqAlertPending_ = true;
114  }
115 
116 
117  void runOnce(size_t sessionCount) HMBDC_RESTRICT {
118  resumeSend(sessionCount);
119  }
120 
121  void stop(){
122  buffer_.reset(0);
123  }
124 
125 private:
127  size_t maxMessageSize_;
128  Buffer& HMBDC_RESTRICT buffer_;
129  size_t wasteSize_;
130 
131  udpcast::EpollFd mcFd_;
132  sockaddr_in mcAddr_;
133  msghdr toSend_;
134  Rater& HMBDC_RESTRICT rater_;
135  size_t maxSendBatch_;
136  std::vector<std::array<char, sizeof(TransportMessageHeader)
137  + sizeof(MessageWrap<TypeTagBackupSource>)>> adBufs_;
138  bool adPending_;
139  char seqAlertBuf_[sizeof(TransportMessageHeader) + sizeof(MessageWrap<SeqAlert>)];
140  bool seqAlertPending_;
141  SeqAlert* HMBDC_RESTRICT seqAlert_;
142  bool startSending_;
143  vector<iovec> toSendMsgs_;
144  ToCleanupAttQueue& toCleanupAttQueue_;
145 
146  void
147  resumeSend(size_t sessionCount) {
148  do {
149  if (hmbdc_likely(toSendMsgs_.size())) {
150  if (hmbdc_unlikely(!mcFd_.isFdReady())) return;
151 
152  toSend_.msg_iov = &(toSendMsgs_[0]);
153  toSend_.msg_iovlen = toSendMsgs_.size();
154  if (sendmsg(mcFd_.fd, &toSend_, MSG_DONTWAIT) < 0) {
155  if (!mcFd_.checkErr()) {
156  HMBDC_LOG_C("sendmmsg failed errno=", errno);
157  }
158  return;
159  } else {
160  buffer_.wasteAfterPeek(0, wasteSize_);
161  toSendMsgs_.clear();
162  wasteSize_ = 0;
163  }
164  }
165 
166  size_t batchBytes = 0;
167  if (hmbdc_unlikely(adPending_)) {
168  for (auto& adBuf : adBufs_) {
169  toSendMsgs_.push_back(iovec{(void*)adBuf.data(), adBuf.size()});
170  batchBytes += sizeof(adBuf);
171  }
172  adPending_ = false;
173  }
174 
175  if (hmbdc_likely(startSending_) && !toSendMsgs_.size()) {
176  Buffer::iterator begin, end;
177  buffer_.peek(0, begin, end, maxSendBatch_);
178  if (hmbdc_unlikely(!sessionCount)) {
179  buffer_.wasteAfterPeek(0u, end - begin);
180  begin = end;
181  }
182  auto it = begin;
183  while (it != end) {
184  void* ptr = *it;
185  auto item = static_cast<TransportMessageHeader*>(ptr);
186  if (hmbdc_unlikely(!rater_.check(item->wireSize()))) break;
187  batchBytes += item->wireSize();
188  if (hmbdc_unlikely(batchBytes > mtu_)) break;
189  if (hmbdc_unlikely(item->typeTag() == MemorySeg::typeTag)) {
190  toSendMsgs_.push_back(iovec{(void*)item->wireBytes(), item->wireSize() - item->wireSizeMemorySeg()});
191  toSendMsgs_.push_back(iovec{(void*)item->wireBytesMemorySeg(), item->wireSizeMemorySeg()});
192  } else {
193  if (hmbdc_unlikely(item->typeTag() == StartMemorySegTrain::typeTag)) {
194  auto& trainHead = item->template wrapped<StartMemorySegTrain>();
195  auto itActual = it; itActual.seq_ += trainHead.segCount + 1;
196  auto actual = static_cast<TransportMessageHeader*>(*itActual);
197  toCleanupAttQueue_.push_back(make_tuple(itActual.seq_
198  , &actual->template wrapped<hasMemoryAttachment>()
199  , trainHead.att.afterConsumedCleanupFunc));
200  }
201  toSendMsgs_.push_back(iovec{(void*)item->wireBytes(), item->wireSize()});
202  }
203  it++;
204 
205  seqAlert_->expectSeq = item->getSeq() + 1;
206  rater_.commit();
207  }
208  wasteSize_ = it - begin;
209  }
210  if (hmbdc_unlikely(seqAlertPending_)) {
211  if (!wasteSize_) {
212  toSendMsgs_.push_back(iovec{seqAlertBuf_, sizeof(seqAlertBuf_)});
213  batchBytes += sizeof(seqAlertBuf_);
214  } //else no need to send seq alert
215  seqAlertPending_ = false;
216  }
217  } while (hmbdc_likely(toSendMsgs_.size()));
218  }
219 };
220 
221 } //mcsendtransport_detail
223 }}}
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
Definition: Endpoint.hpp:17
Definition: TypedString.hpp:84
Definition: BlockingBuffer.hpp:11
Definition: Message.hpp:212
Definition: Messages.hpp:163
Definition: Transport.hpp:21
Definition: Message.hpp:263
Definition: Rater.hpp:10
Definition: Rater.hpp:11
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89