1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/tips/udpcast/Transport.hpp" 4 #include "hmbdc/tips/udpcast/DefaultUserConfig.hpp" 5 #include "hmbdc/tips/TypeTagSet.hpp" 6 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp" 7 #include "hmbdc//MetaUtils.hpp" 9 #include <boost/lexical_cast.hpp> 12 #include <type_traits> 15 namespace hmbdc {
namespace tips {
namespace udpcast {
21 using Transport::Transport;
25 namespace recvtransportengine_detail {
36 template <
typename OutputBuffer>
40 , OutputBuffer& outputBuffer)
43 , outputBuffer_(outputBuffer)
44 , maxItemSize_(outputBuffer.maxItemSize())
45 , buf_(
new char[mtu_])
49 if (setsockopt(fd, SOL_SOCKET,SO_REUSEADDR, &yes,
sizeof(yes)) < 0) {
50 HMBDC_LOG_C(
"failed to set reuse address errno=", errno);
52 auto sz = config_.getExt<
int>(
"udpRecvBufferBytes");
54 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz,
sizeof(sz)) < 0) {
55 HMBDC_LOG_C(
"failed to set send buffer size=", sz);
59 auto udpcastListenPort = config_.getExt<uint16_t>(
"udpcastListenPort");
60 struct sockaddr_in udpcastListenAddrPort;
61 memset(&udpcastListenAddrPort, 0,
sizeof(udpcastListenAddrPort));
62 udpcastListenAddrPort.sin_family = AF_INET;
63 auto ipStr = cfg.
getExt<
string>(
"udpcastListenAddr") ==
string(
"ifaceAddr")
64 ? comm::inet::getLocalIpMatchMask(cfg.
getExt<
string>(
"ifaceAddr")):cfg.
getExt<
string>(
"udpcastListenAddr");
65 udpcastListenAddrPort.sin_addr.s_addr = inet_addr(ipStr.c_str());
66 udpcastListenAddrPort.sin_port = htons(udpcastListenPort);
67 if (::bind(fd, (
struct sockaddr *)&udpcastListenAddrPort,
sizeof(udpcastListenAddrPort)) < 0) {
68 HMBDC_THROW(runtime_error,
"failed to bind unicast udpcast listen address " 69 << ipStr <<
':' << cfg.
getExt<
short>(
"udpcastListenPort") <<
" errno=" << errno);
72 if ((udpcastListenAddrPort.sin_addr.s_addr & 0x000000F0) == 0xE0) {
75 mreq.imr_multiaddr.s_addr = udpcastListenAddrPort.sin_addr.s_addr;
77 comm::inet::getLocalIpMatchMask(config_.getExt<
string>(
"ifaceAddr"));
78 mreq.imr_interface.s_addr=inet_addr(iface.c_str());
79 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
80 HMBDC_THROW(runtime_error,
"failed to join " << ipStr <<
':' 81 << cfg.
getExt<
short>(
"udpcastPort"));
84 HMBDC_LOG_N(
"listen at ", cfg.
getExt<
string>(
"ifaceAddr"));
89 template <MessageTupleC Messages,
typename CcNode>
90 void subscribeFor(CcNode
const& node, uint16_t mod, uint16_t res) {
91 subscriptions_.addSubsFor<Messages>(node, mod, res);
94 template <MessageC Message>
96 subscriptions_.add(Message::typeTag);
107 utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, *
this);
110 void runOnce(
bool alwaysPoll =
false) HMBDC_RESTRICT {
111 if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
112 utils::EpollTask::instance().poll();
117 sockaddr_in udpcastListenRemoteAddr = {0};
119 void resumeRead() HMBDC_RESTRICT {
121 while (bytesRecved_) {
122 auto h =
reinterpret_cast<TransportMessageHeader*
>(bufCur_);
123 auto wireSize = h->wireSize();
125 if (hmbdc_likely(bytesRecved_ >= wireSize)) {
126 if (subscriptions_.check(h->typeTag())) {
127 if (hmbdc_unlikely(wireSize > bytesRecved_)) {
130 auto l = std::min<size_t>(maxItemSize_, h->messagePayloadLen());
131 outputBuffer_.put(h->payload(), l);
133 bytesRecved_ -= wireSize;
142 socklen_t addrLen =
sizeof(udpcastListenRemoteAddr);
143 auto l = recvfrom(fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT
144 , (
struct sockaddr *)&udpcastListenRemoteAddr, &addrLen);
145 if (hmbdc_unlikely(l < 0)) {
147 HMBDC_LOG_C(
"recvmmsg failed errno=", errno);
156 }
while(bytesRecved_);
159 OutputBuffer& outputBuffer_;
164 TypeTagSet subscriptions_;
167 template <
typename OutputBuffer>
170 ,
Client<RecvTransportEngineImpl<OutputBuffer>> {
188 using Transport::hmbdcName;
196 HMBDC_LOG_C(e.what());
204 template <
typename OutputBuffer>
205 using RecvTransportImpl = recvtransportengine_detail::RecvTransportImpl<OutputBuffer>;
207 template <
typename OutputBuffer>
208 using RecvTransportEngine = recvtransportengine_detail::RecvTransportEngineImpl<OutputBuffer>;
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: MonoLockFreeBuffer.hpp:16
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:45
void setAdditionalFallbackConfig(Config const &c)
set additional defaults
Definition: Config.hpp:154
Definition: TypedString.hpp:84
void invokedCb(size_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:184
Config & resetSection(char const *section, bool sectionExists=true)
change section name
Definition: Config.hpp:177
impl class
Definition: RecvTransportEngine.hpp:37
void start()
start the show by schedule the mesage recv
Definition: RecvTransportEngine.hpp:106
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:195
interface to power a multicast transport receiving functions
Definition: RecvTransportEngine.hpp:20
Definition: RecvTransportEngine.hpp:168
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122