1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/tcpcast/Transport.hpp" 4 #include "hmbdc/app/tcpcast/RecvSession.hpp" 5 #include "hmbdc/app/tcpcast/Messages.hpp" 6 #include "hmbdc/app/udpcast/RecvTransportEngine.hpp" 7 #include "hmbdc/app/Logger.hpp" 8 #include "hmbdc/text/StringTrieSet.hpp" 10 #include <boost/bind.hpp> 11 #include <boost/lexical_cast.hpp> 12 #include <boost/unordered_map.hpp> 18 namespace hmbdc {
namespace app {
namespace tcpcast {
25 using ptr = std::shared_ptr<RecvTransport>;
26 using Transport::Transport;
47 virtual void stopListenTo(
comm::Topic const& t) = 0;
51 namespace recvtransportengine_detail {
60 template <
typename OutputBuffer,
typename MsgArbitrator>
65 ,
Client<RecvTransportEngine<OutputBuffer, MsgArbitrator>> {
67 , std::tuple<Subscribe, Unsubscribe, TopicSource>>;
82 , OutputBuffer& outputBuffer
83 , MsgArbitrator arb =
NoOpArb())
91 }), config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
92 , outputBuffer_(outputBuffer)
93 , maxItemSize_(outputBuffer.maxItemSize())
95 , myIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
96 cfg.
getExt<
string>(
"ifaceAddr")).c_str()))
97 , mcastEmuProxyScanTimer_(
98 Duration::seconds(cfg.
getExt<
size_t>(
"mcastEmuProxyScanPeriodSeconds"))) {
101 HMBDC_THROW(std::out_of_range
102 ,
"buffer is too small for notification: SessionStarted and SessionDropped");
104 mcConfig_.put(
"outBufferSizePower2", 3u);
108 if (cfg.
getExt<
bool>(
"useTcpcastMcProxy")) {
109 config_(tcpcastMcEmuProxyAddrs_,
"udpcastDests");
110 if (tcpcastMcEmuProxyAddrs_.size() == 0) {
111 HMBDC_THROW(std::out_of_range,
"no TcpcastEmuProxy specified");
113 topicSinkMsgBufLen_ =
114 udpcast::SendTransport::encode(tcpcastAdTopic_, topicSinkMsgBuf_,
sizeof(topicSinkMsgBuf_)
116 , cfg.
getExt<uint16_t>(
"udpcastListenPort")})->wireSize();
121 mcReceiver_->listenTo(tcpcastAdTopic_);
125 for (
auto& s : recvSessions_) {
126 s.second->heartbeat();
131 schedule(SysTime::now(), *
this);
132 if (tcpcastMcEmuProxyAddrs_.size()) {
133 mcastEmuProxyScanTimer_.setCallback(
135 for (
auto const& s : tcpcastMcEmuProxyAddrs_) {
136 sendto(mcReceiver_->fd, topicSinkMsgBuf_, topicSinkMsgBufLen_
137 , MSG_NOSIGNAL|MSG_DONTWAIT, (
struct sockaddr *)&s.v,
sizeof(s.v));
141 schedule(SysTime::now(), mcastEmuProxyScanTimer_);
153 if (runLock_.try_lock()) {
154 this->checkTimers(time::SysTime::now());
168 using Transport::hmbdcName;
169 using Transport::schedSpec;
178 HMBDC_LOG_C(e.what());
186 void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT
override {
187 for (
auto it = recvSessions_.begin(); it != recvSessions_.end();) {
188 if (hmbdc_unlikely(!(*it).second->runOnce())) {
189 (*it).second->stop();
190 recvSessions_.erase(it++);
196 auto n = buffer_.peek(begin, end);
201 buffer_.wasteAfterPeek(begin, n);
203 mcReceiver_->runOnce(
true);
210 subscriptions_.add(boost::lexical_cast<std::string>(t.topic));
211 for (
auto& p : recvSessions_) {
212 p.second->sendSubscribe(t.topic);
220 subscriptions_.erase(boost::lexical_cast<std::string>(t.topic));
221 for (
auto& p : recvSessions_) {
222 p.second->sendUnsubscribe(t.topic);
230 auto ip = inet_addr(t.ip);
232 if (ip != 0x100007ful
234 && (!t.loopback || t.pid == getpid())) {
237 auto key = make_pair(ip, t.port);
238 if (recvSessions_.find(key) == recvSessions_.end()) {
239 std::regex topicRegex(t.topicRegex);
240 for (
auto it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
241 if (std::regex_match(it->first.c_str(), topicRegex)
252 sess->start(ip, t.port);
253 recvSessions_[key] =
typename Session::ptr(sess);
254 }
catch (std::exception
const& e) {
255 HMBDC_LOG_C(e.what());
263 void listenTo(
Topic const& t)
override {
266 void stopListenTo(
Topic const& t)
override {
270 ~RecvTransportEngine() {
271 this->runLock_.unlock();
277 Config config_, mcConfig_;
279 OutputBuffer& outputBuffer_;
281 unique_ptr<udpcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
284 boost::unordered_map<pair<uint64_t, uint16_t>
285 ,
typename Session::ptr> recvSessions_;
288 std::vector<comm::inet::Endpoint> tcpcastMcEmuProxyAddrs_;
290 char topicSinkMsgBuf_[64];
291 size_t topicSinkMsgBufLen_;
296 template <
typename OutputBuffer,
typename MsgArbitrator>
T getExt(const path_type ¶m, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:225
RecvTransportEngine(Config const &cfg, OutputBuffer &outputBuffer, MsgArbitrator arb=NoOpArb())
ctor
Definition: RecvTransportEngine.hpp:81
Definition: MonoLockFreeBuffer.hpp:15
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: TypedString.hpp:76
Definition: Messages.hpp:73
Definition: Timers.hpp:66
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
Definition: Messages.hpp:66
void handleMessageCb(TopicSource const &t)
only used by MD
Definition: RecvTransportEngine.hpp:229
Definition: Transport.hpp:69
this message appears in the receiver's buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:145
void handleMessageCb(Unsubscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:219
Definition: Message.hpp:78
Definition: Messages.hpp:154
a singleton that holding tcpcast resources
Definition: NetContext.hpp:44
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: RecvTransportEngine.hpp:152
Definition: RecvSession.hpp:29
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:38
impl class
Definition: RecvTransportEngine.hpp:58
Definition: StringTrieSetDetail.hpp:115
Definition: Message.hpp:112
void stoppedCb(std::exception const &e) override
should not happen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:177
Definition: Messages.hpp:82
void invokedCb(uint16_t threadSerialNumber) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:186
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:23
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
impl class
Definition: RecvTransportEngine.hpp:61
void handleMessageCb(Subscribe const &t)
only used by MD
Definition: RecvTransportEngine.hpp:209
Definition: Timers.hpp:113
this message appears in the receiver's buffer indicating a new source is connected ...
Definition: Messages.hpp:131
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
start the show by schedule the message recv
Definition: RecvTransportEngine.hpp:164
Definition: LockFreeBufferMisc.hpp:89