1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/tips/rmcast/Transport.hpp" 4 #include "hmbdc/tips/udpcast/Transport.hpp" 5 #include "hmbdc/app/Logger.hpp" 6 #include "hmbdc/comm/inet/Endpoint.hpp" 7 #include "hmbdc//MetaUtils.hpp" 9 #include <boost/bind.hpp> 10 #include <boost/lexical_cast.hpp> 13 #include <type_traits> 15 namespace hmbdc {
namespace tips {
namespace rmcast {
17 namespace mcrecvtransport_detail {
28 template <
typename OutputBuffer,
typename Ep2SessionDict>
32 friend struct NetContext;
37 , Ep2SessionDict& sessionDict)
39 , cmdBuffer_(cmdBuffer)
42 , buf_(
new char[mtu_])
45 , subscriptions_(subscriptions)
46 , sessionDict_(sessionDict) {
48 if (setsockopt(mcFd_.fd, SOL_SOCKET,SO_REUSEADDR, &yes,
sizeof(yes)) < 0) {
49 HMBDC_LOG_C(
"failed to set reuse address errno=", errno);
51 auto sz = config_.getExt<
int>(
"udpRecvBufferBytes");
53 if (setsockopt(mcFd_.fd, SOL_SOCKET, SO_RCVBUF, &sz,
sizeof(sz)) < 0) {
54 HMBDC_LOG_C(
"failed to set send buffer size=", sz);
60 , config_.getExt<uint16_t>(
"mcastPort")).v;
61 if (::bind(mcFd_.fd, (
struct sockaddr *)&mcAddr,
sizeof(mcAddr)) < 0) {
62 HMBDC_THROW(runtime_error,
"failed to bind " 63 << config_.getExt<
string>(
"mcastAddr") <<
':' 64 << cfg.
getExt<
short>(
"mcastPort"));
68 mreq.imr_multiaddr.s_addr=inet_addr(config_.getExt<
string>(
"mcastAddr").c_str());
70 comm::inet::getLocalIpMatchMask(config_.getExt<
string>(
"ifaceAddr"));
71 mreq.imr_interface.s_addr=inet_addr(iface.c_str());
72 if (setsockopt(mcFd_.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
73 HMBDC_THROW(runtime_error,
"failed to join " << config_.getExt<
string>(
"mcastAddr") <<
':' 74 << cfg.
getExt<
short>(
"mcastPort"));
85 utils::EpollTask::instance().add(utils::EpollTask::EPOLLIN|utils::EpollTask::EPOLLET, mcFd_);
99 if (hmbdc_unlikely(sendFrom_.sin_family != AF_INET)) {
102 if (hmbdc_likely(bytesRecved_)) {
106 while (bytesRecved_ >=
sizeof(TransportMessageHeader)) {
107 auto h =
reinterpret_cast<TransportMessageHeader*
>(bufCur_);
108 auto wireSize = h->wireSize();
109 if (hmbdc_likely(bytesRecved_ >= wireSize)) {
110 if (hmbdc_unlikely(h->typeTag() == TypeTagBackupSource::typeTag)) {
111 auto it = cmdBuffer_.claim();
113 size_t l = h->messagePayloadLen;
114 l = std::min(cmdBuffer_.maxItemSize(), l);
115 memcpy(b, h->payload(), l);
116 auto& bts = b->template get<TypeTagBackupSource>();
117 bts.sendFrom = sendFrom_;
118 cmdBuffer_.commit(it);
120 auto session = sessionDict_.find(sendFrom_);
121 if (hmbdc_unlikely(session != sessionDict_.end())) {
122 auto a = session->second->accept(h);
128 bytesRecved_ -= wireSize;
137 auto addrLen =
sizeof(sendFrom_);
138 if (mcFd_.isFdReady()) {
139 auto l = recvfrom(mcFd_.fd, buf_, mtu_, MSG_NOSIGNAL|MSG_DONTWAIT
140 , (sockaddr*)&sendFrom_, (socklen_t*)&addrLen);
141 if (hmbdc_unlikely(l < 0)) {
142 if (!mcFd_.checkErr()) {
143 HMBDC_LOG_C(
"recvmsg failed errno=", errno);
152 }
while(bytesRecved_);
156 sockaddr_in sendFrom_;
157 udpcast::EpollFd mcFd_;
161 TypeTagSet
const& HMBDC_RESTRICT subscriptions_;
162 Ep2SessionDict& HMBDC_RESTRICT sessionDict_;
165 template <
typename OutputBuffer,
typename Ep2SessionDict>
166 using McRecvTransport = mcrecvtransport_detail::McRecvTransport<OutputBuffer, Ep2SessionDict>;
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:238
Definition: MonoLockFreeBuffer.hpp:16
class to hold an hmbdc configuration
Definition: Config.hpp:45
impl class
Definition: McRecvTransport.hpp:29
Definition: Endpoint.hpp:17
Definition: TypedString.hpp:84
void runOnce() HMBDC_RESTRICT
power the io_service and other things
Definition: McRecvTransport.hpp:92
Definition: Message.hpp:212
void start()
start the show by schedule the mesage recv
Definition: McRecvTransport.hpp:84
Definition: Transport.hpp:18