hmbdc
simplify-high-performance-messaging-programming
McRecvTransport.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/rmcast/Transport.hpp"
4 #include "hmbdc/tips/udpcast/Transport.hpp"
5 #include "hmbdc/app/Logger.hpp"
6 #include "hmbdc/comm/inet/Endpoint.hpp"
7 #include "hmbdc//MetaUtils.hpp"
8 
9 #include <boost/bind.hpp>
10 #include <boost/lexical_cast.hpp>
11 
12 #include <memory>
13 #include <type_traits>
14 
15 namespace hmbdc { namespace tips { namespace rmcast {
16 
17 namespace mcrecvtransport_detail {
18 using namespace std;
19 using namespace hmbdc::app;
20 using namespace hmbdc::time;
21 
22 /**
23  * @brief impl class
24  *
25  * @tparam OutputBuffer type of buffer to hold resulting network messages
26  * between different recv transport. By default, keeping all
27  */
28 template <typename OutputBuffer, typename Ep2SessionDict>
30 : Transport {
31  using SELF = McRecvTransport;
32  friend struct NetContext; //only be created by NetContext
33 
34  McRecvTransport(Config const& cfg
36  , TypeTagSet const& subscriptions
37  , Ep2SessionDict& sessionDict)
38  : Transport(cfg)
39  , cmdBuffer_(cmdBuffer)
40  , sendFrom_{0}
41  , mcFd_(cfg)
42  , buf_(new char[mtu_])
43  , bufCur_(nullptr)
44  , bytesRecved_(0)
45  , subscriptions_(subscriptions)
46  , sessionDict_(sessionDict) {
47  uint32_t yes = 1;
48  if (setsockopt(mcFd_.fd, SOL_SOCKET,SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
49  HMBDC_LOG_C("failed to set reuse address errno=", errno);
50  }
51  auto sz = config_.getExt<int>("udpRecvBufferBytes");
52  if (sz) {
53  if (setsockopt(mcFd_.fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
54  HMBDC_LOG_C("failed to set send buffer size=", sz);
55  }
56  }
57 
58  /* bind to receive address */
59  auto mcAddr = comm::inet::Endpoint(config_.getExt<std::string>("mcastAddr")
60  , config_.getExt<uint16_t>("mcastPort")).v;
61  if (::bind(mcFd_.fd, (struct sockaddr *)&mcAddr, sizeof(mcAddr)) < 0) {
62  HMBDC_THROW(runtime_error, "failed to bind "
63  << config_.getExt<string>("mcastAddr") << ':'
64  << cfg.getExt<short>("mcastPort"));
65  }
66  /* use setsockopt() to request that the kernel join a multicast group */
67  struct ip_mreq mreq;
68  mreq.imr_multiaddr.s_addr=inet_addr(config_.getExt<string>("mcastAddr").c_str());
69  auto iface =
70  comm::inet::getLocalIpMatchMask(config_.getExt<string>("ifaceAddr"));
71  mreq.imr_interface.s_addr=inet_addr(iface.c_str());
72  if (setsockopt(mcFd_.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
73  HMBDC_THROW(runtime_error, "failed to join " << config_.getExt<string>("mcastAddr") << ':'
74  << cfg.getExt<short>("mcastPort"));
75  }
76  }
77 
78  ~McRecvTransport() {
79  }
80 
81 /**
82  * @brief start the show by schedule the mesage recv
83  */
84  void start() {
85  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, mcFd_);
86  }
87 
88 /**
89  * @brief power the io_service and other things
90  *
91  */
92  void runOnce() HMBDC_RESTRICT {
93  resumeRead();
94  }
95 
96 private:
97  void resumeRead() {
98  do {
99  if (hmbdc_unlikely(sendFrom_.sin_family != AF_INET)) {
100  bytesRecved_ = 0;
101  }
102  if (hmbdc_likely(bytesRecved_)) {
103  if (!bufCur_) {
104  bufCur_ = buf_;
105  }
106  while (bytesRecved_ >= sizeof(TransportMessageHeader)) {
107  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
108  auto wireSize = h->wireSize();
109  if (hmbdc_likely(bytesRecved_ >= wireSize)) {
110  if (hmbdc_unlikely(h->typeTag() == TypeTagBackupSource::typeTag)) {
111  auto it = cmdBuffer_.claim();
112  auto b = static_cast<MessageHead*>(*it);
113  size_t l = h->messagePayloadLen;
114  l = std::min(cmdBuffer_.maxItemSize(), l);
115  memcpy(b, h->payload(), l);
116  auto& bts = b->template get<TypeTagBackupSource>();
117  bts.sendFrom = sendFrom_;
118  cmdBuffer_.commit(it);
119  } else {
120  auto session = sessionDict_.find(sendFrom_);
121  if (hmbdc_unlikely(session != sessionDict_.end())) {
122  auto a = session->second->accept(h);
123  if (a == 0) {
124  return; //wait for backup
125  }
126  }
127  }
128  bytesRecved_ -= wireSize;
129  bufCur_ += wireSize;
130  } else {
131  break;
132  }
133  }
134  }
135  bufCur_ = nullptr;
136  bytesRecved_ = 0;
137  auto addrLen = sizeof(sendFrom_);
138  if (mcFd_.isFdReady()) {
139  auto l = recvfrom(mcFd_.fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT
140  , (sockaddr*)&sendFrom_, (socklen_t*)&addrLen);
141  if (hmbdc_unlikely(l < 0)) {
142  if (!mcFd_.checkErr()) {
143  HMBDC_LOG_C("recvmsg failed errno=", errno);
144  }
145  return;
146  } else if (l == 0) {
147  //nothing to do now
148  return;
149  }
150  bytesRecved_ = l;
151  }
152  } while(bytesRecved_);
153  }
154 
155  hmbdc::pattern::MonoLockFreeBuffer& HMBDC_RESTRICT cmdBuffer_;
156  sockaddr_in sendFrom_;
157  udpcast::EpollFd mcFd_;
158  char* buf_;
159  char* bufCur_;
160  size_t bytesRecved_;
161  TypeTagSet const& HMBDC_RESTRICT subscriptions_;
162  Ep2SessionDict& HMBDC_RESTRICT sessionDict_;
163 };
164 } //mcrecvtransport_detail
165 template <typename OutputBuffer, typename Ep2SessionDict>
166 using McRecvTransport = mcrecvtransport_detail::McRecvTransport<OutputBuffer, Ep2SessionDict>;
167 
168 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: MonoLockFreeBuffer.hpp:16
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
impl class
Definition: McRecvTransport.hpp:29
Definition: Endpoint.hpp:17
Definition: TypedString.hpp:84
void runOnce() HMBDC_RESTRICT
power the io_service and other things
Definition: McRecvTransport.hpp:92
Definition: Message.hpp:212
void start()
start the show by schedule the mesage recv
Definition: McRecvTransport.hpp:84
Definition: TypeTagSet.hpp:141
Definition: Rater.hpp:10
Definition: Base.hpp:12