1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/udpcast/Transport.hpp" 4 #include "hmbdc/text/StringTrieSet.hpp" 5 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 6 #include "hmbdc//MetaUtils.hpp" 8 #include <boost/lexical_cast.hpp> 11 #include <type_traits> 14 namespace hmbdc {
namespace app {
namespace udpcast {
20 using ptr = std::shared_ptr<RecvTransport>;
21 using Transport::Transport;
40 virtual void listenTo(
Topic const& t) = 0;
41 virtual void stopListenTo(
Topic const& t) = 0;
44 namespace recvtransportengine_detail {
57 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
60 using MD = MessageDispacher<RecvTransportImpl, std::tuple<Subscribe, Unsubscribe>>;
78 , OutputBuffer& outputBuffer
79 , MsgArbitrator arb =
NoOpArb())
82 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
83 , outputBuffer_(outputBuffer)
84 , maxItemSize_(outputBuffer.maxItemSize())
85 , buf_(new char[mtu_])
90 if (setsockopt(fd, SOL_SOCKET,SO_REUSEADDR, &yes,
sizeof(yes)) < 0) {
91 HMBDC_LOG_C(
"failed to set reuse address errno=", errno);
93 auto sz = config_.getExt<
int>(
"udpRecvBufferBytes");
95 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz,
sizeof(sz)) < 0) {
96 HMBDC_LOG_C(
"failed to set send buffer size=", sz);
100 auto udpcastListenPort = config_.getExt<uint16_t>(
"udpcastListenPort");
101 struct sockaddr_in udpcastListenAddrPort;
102 memset(&udpcastListenAddrPort, 0,
sizeof(udpcastListenAddrPort));
103 udpcastListenAddrPort.sin_family = AF_INET;
104 auto ipStr = cfg.
getExt<
string>(
"udpcastListenAddr") ==
string(
"ifaceAddr")
105 ? comm::inet::getLocalIpMatchMask(cfg.
getExt<
string>(
"ifaceAddr")):cfg.
getExt<
string>(
"udpcastListenAddr");
106 udpcastListenAddrPort.sin_addr.s_addr = inet_addr(ipStr.c_str());
107 udpcastListenAddrPort.sin_port = htons(udpcastListenPort);
108 if (bind(fd, (
struct sockaddr *)&udpcastListenAddrPort,
sizeof(udpcastListenAddrPort)) < 0) {
109 HMBDC_THROW(runtime_error,
"failed to bind unicast udpcast listen address " 110 << ipStr <<
':' << cfg.
getExt<
short>(
"udpcastListenPort") <<
" errno=" << errno);
113 if ((udpcastListenAddrPort.sin_addr.s_addr & 0x000000F0) == 0xE0) {
116 mreq.imr_multiaddr.s_addr = udpcastListenAddrPort.sin_addr.s_addr;
118 comm::inet::getLocalIpMatchMask(config_.getExt<
string>(
"ifaceAddr"));
119 mreq.imr_interface.s_addr=inet_addr(iface.c_str());
120 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
121 HMBDC_THROW(runtime_error,
"failed to join " << ipStr <<
':' 122 << cfg.
getExt<
short>(
"udpcastPort"));
125 HMBDC_LOG_N(
"listen at ", cfg.
getExt<
string>(
"ifaceAddr"));
138 utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, *
this);
141 void runOnce(
bool alwaysPoll =
false) HMBDC_RESTRICT {
143 auto n = buffer_.peek(begin, end);
148 buffer_.wasteAfterPeek(begin, n);
149 if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
150 utils::EpollTask::instance().poll();
159 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
166 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
170 void listenTo(
Topic const& t)
override {
174 void stopListenTo(
Topic const& t)
override {
177 sockaddr_in udpcastListenRemoteAddr = {0};
179 template <
bool is_raw_arb>
180 typename enable_if<is_raw_arb, int>::type applyRawArb(
void* pkt,
size_t len) {
181 return arb_(pkt, len);
184 template <
bool is_raw_arb>
185 typename enable_if<!is_raw_arb, int>::type applyRawArb(
void*,
size_t) {
189 template <
bool is_raw_arb>
194 template <
bool is_raw_arb>
199 void resumeRead() HMBDC_RESTRICT {
202 using arg0 =
typename traits::template arg<0>::type;
203 bool const is_raw_arb = std::is_same<void*, typename std::decay<arg0>::type>::value;
205 if (bytesRecved_ && bufCur_ == buf_) {
206 auto a = applyRawArb<is_raw_arb>(buf_, bytesRecved_);
207 if (hmbdc_unlikely(a == 0)) {
210 }
else if (hmbdc_unlikely(a < 0)) {
214 while (bytesRecved_) {
216 auto wireSize = h->wireSize();
218 if (hmbdc_likely(bytesRecved_ >= wireSize)) {
219 if (subscriptions_.check(h->topic())) {
220 if (hmbdc_unlikely(wireSize > bytesRecved_)) {
223 auto a = applyArb<is_raw_arb>(h);
225 if (hmbdc_likely(a > 0)) {
226 auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
227 outputBuffer_.put(h->payload(), l);
233 bytesRecved_ -= wireSize;
242 socklen_t addrLen =
sizeof(udpcastListenRemoteAddr);
243 auto l = recvfrom(fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT
244 , (
struct sockaddr *)&udpcastListenRemoteAddr, &addrLen);
245 if (hmbdc_unlikely(l < 0)) {
247 HMBDC_LOG_C(
"recvmmsg failed errno=", errno);
256 }
while(bytesRecved_);
260 OutputBuffer& outputBuffer_;
269 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
272 ,
Client<RecvTransportEngineImpl<OutputBuffer, MsgArbitrator>> {
285 if (runLock_.try_lock()) {
286 RecvTransportEngineImpl::invokedCb(0);
316 HMBDC_LOG_C(e.what());
320 this->runLock_.unlock();
329 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
332 template <
typename OutputBuffer,
typename MsgArbitrator = RecvTransport::NoOpArb>
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:226
Definition: MonoLockFreeBuffer.hpp:15
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:46
void handleMessageCb(Unsubscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:165
a singleton that holding udpcast resources
Definition: NetContext.hpp:37
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:19
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
Definition: Messages.hpp:65
void handleMessageCb(Subscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:158
Definition: Message.hpp:78
RecvTransportImpl(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:77
Definition: RecvTransportEngine.hpp:270
impl class
Definition: RecvTransportEngine.hpp:58
Definition: StringTrieSetDetail.hpp:115
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:137
Definition: Message.hpp:112
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:315
void invokedCb(size_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:296
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Definition: Messages.hpp:73
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: RecvTransportEngine.hpp:284
Definition: MetaUtils.hpp:9
Definition: LockFreeBufferMisc.hpp:89
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:305