hmbdc
simplify-high-performance-messaging-programming
RecvTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/udpcast/Transport.hpp"
4 #include "hmbdc/tips/udpcast/DefaultUserConfig.hpp"
5 #include "hmbdc/tips/TypeTagSet.hpp"
6 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
7 #include "hmbdc//MetaUtils.hpp"
8 
9 #include <boost/lexical_cast.hpp>
10 
11 #include <memory>
12 #include <type_traits>
13 #include <mutex>
14 
15 namespace hmbdc { namespace tips { namespace udpcast {
16 
17 /**
18  * @brief interface to power a multicast transport receiving functions
19  */
21  using Transport::Transport;
22  virtual ~RecvTransport(){}
23 };
24 
25 namespace recvtransportengine_detail {
26 using namespace hmbdc::app;
27 using namespace std;
29 
30 /**
31  * @class RecvTransportImpl<>
32  * @brief impl class
33  *
34  * @tparam OutputBuffer type of buffer to hold resulting network messages
35  */
36 template <typename OutputBuffer>
38 : RecvTransport {
40  , OutputBuffer& outputBuffer)
41  : RecvTransport((cfg.setAdditionalFallbackConfig(Config{DefaultUserConfig})
42  , cfg.resetSection("rx", false)))
43  , outputBuffer_(outputBuffer)
44  , maxItemSize_(outputBuffer.maxItemSize())
45  , buf_(new char[mtu_])
46  , bufCur_(buf_)
47  , bytesRecved_(0) {
48  uint32_t yes = 1;
49  if (setsockopt(fd, SOL_SOCKET,SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
50  HMBDC_LOG_C("failed to set reuse address errno=", errno);
51  }
52  auto sz = config_.getExt<int>("udpRecvBufferBytes");
53  if (sz) {
54  if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)) < 0) {
55  HMBDC_LOG_C("failed to set send buffer size=", sz);
56  }
57  }
58 
59  auto udpcastListenPort = config_.getExt<uint16_t>("udpcastListenPort");
60  struct sockaddr_in udpcastListenAddrPort;
61  memset(&udpcastListenAddrPort, 0, sizeof(udpcastListenAddrPort));
62  udpcastListenAddrPort.sin_family = AF_INET;
63  auto ipStr = cfg.getExt<string>("udpcastListenAddr") == string("ifaceAddr")
64  ? comm::inet::getLocalIpMatchMask(cfg.getExt<string>("ifaceAddr")):cfg.getExt<string>("udpcastListenAddr");
65  udpcastListenAddrPort.sin_addr.s_addr = inet_addr(ipStr.c_str());
66  udpcastListenAddrPort.sin_port = htons(udpcastListenPort);
67  if (::bind(fd, (struct sockaddr *)&udpcastListenAddrPort, sizeof(udpcastListenAddrPort)) < 0) {
68  HMBDC_THROW(runtime_error, "failed to bind unicast udpcast listen address "
69  << ipStr << ':' << cfg.getExt<short>("udpcastListenPort") << " errno=" << errno);
70  }
71 
72  if ((udpcastListenAddrPort.sin_addr.s_addr & 0x000000F0) == 0xE0) {//multicast address
73  /* use setsockopt() to request that the kernel join a multicast group */
74  struct ip_mreq mreq;
75  mreq.imr_multiaddr.s_addr = udpcastListenAddrPort.sin_addr.s_addr;
76  auto iface =
77  comm::inet::getLocalIpMatchMask(config_.getExt<string>("ifaceAddr"));
78  mreq.imr_interface.s_addr=inet_addr(iface.c_str());
79  if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
80  HMBDC_THROW(runtime_error, "failed to join " << ipStr << ':'
81  << cfg.getExt<short>("udpcastPort"));
82  }
83  }
84  HMBDC_LOG_N("listen at ", cfg.getExt<string>("ifaceAddr"));
85 
86  start();
87  }
88 
89  template <MessageTupleC Messages, typename CcNode>
90  void subscribeFor(CcNode const& node, uint16_t mod, uint16_t res) {
91  subscriptions_.addSubsFor<Messages>(node, mod, res);
92  }
93 
94  template <MessageC Message>
95  void subscribe() {
96  subscriptions_.add(Message::typeTag);
97  }
98 
100  delete [] buf_;
101  }
102 
103 /**
104  * @brief start the show by schedule the mesage recv
105  */
106  void start() {
107  utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, *this);
108  }
109 
110  void runOnce(bool alwaysPoll = false) HMBDC_RESTRICT {
111  if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
112  utils::EpollTask::instance().poll();
113  }
114  resumeRead();
115  }
116 
117  sockaddr_in udpcastListenRemoteAddr = {0};
118 private:
119  void resumeRead() HMBDC_RESTRICT {
120  do {
121  while (bytesRecved_) {
122  auto h = reinterpret_cast<TransportMessageHeader*>(bufCur_);
123  auto wireSize = h->wireSize();
124 
125  if (hmbdc_likely(bytesRecved_ >= wireSize)) {
126  if (subscriptions_.check(h->typeTag())) {
127  if (hmbdc_unlikely(wireSize > bytesRecved_)) {
128  break;
129  }
130  auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
131  outputBuffer_.put(h->payload(), l);
132  }
133  bytesRecved_ -= wireSize;
134  bufCur_ += wireSize;
135  } else {
136  break;
137  }
138  }
139  bytesRecved_ = 0;
140  bufCur_ = buf_;
141  if (isFdReady()) {
142  socklen_t addrLen = sizeof(udpcastListenRemoteAddr);
143  auto l = recvfrom(fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT
144  , (struct sockaddr *)&udpcastListenRemoteAddr, &addrLen);
145  if (hmbdc_unlikely(l < 0)) {
146  if (!checkErr()) {
147  HMBDC_LOG_C("recvmmsg failed errno=", errno);
148  }
149  return;
150  } else if (l == 0) {
151  //nothing to do now
152  return;
153  }
154  bytesRecved_ = l;
155  }
156  } while(bytesRecved_);
157  }
158 
159  OutputBuffer& outputBuffer_;
160  size_t maxItemSize_;
161  char* buf_;
162  char* bufCur_;
163  size_t bytesRecved_;
164  TypeTagSet subscriptions_;
165 };
166 
167 template <typename OutputBuffer>
169 : RecvTransportImpl<OutputBuffer>
170 , Client<RecvTransportEngineImpl<OutputBuffer>> {
174 
175  void rotate() {
177  }
178 
179 /**
180  * @brief power the io_service and other things
181  *
182  */
183  /*virtual*/
184  void invokedCb(size_t) HMBDC_RESTRICT override {
186  }
187 
188  using Transport::hmbdcName;
189 /**
190  * @brief should not happen ever unless an exception thrown
191  *
192  * @param e exception thown
193  */
194  /*virtual*/
195  void stoppedCb(std::exception const& e) override {
196  HMBDC_LOG_C(e.what());
197  };
198 
199 private:
200 };
201 
202 } //recvtransportengine_detail
203 
204 template <typename OutputBuffer>
205 using RecvTransportImpl = recvtransportengine_detail::RecvTransportImpl<OutputBuffer>;
206 
207 template <typename OutputBuffer>
208 using RecvTransportEngine = recvtransportengine_detail::RecvTransportEngineImpl<OutputBuffer>;
209 
210 }}}
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: MonoLockFreeBuffer.hpp:16
Definition: Transport.hpp:39
Definition: Base.hpp:12
class to hold an hmbdc configuration
Definition: Config.hpp:45
void setAdditionalFallbackConfig(Config const &c)
set additional defaults
Definition: Config.hpp:154
Definition: TypedString.hpp:84
void invokedCb(size_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:184
Config & resetSection(char const *section, bool sectionExists=true)
change section name
Definition: Config.hpp:177
impl class
Definition: RecvTransportEngine.hpp:37
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:106
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:195
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:20
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: Base.hpp:12