1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/tips/tcpcast/Transport.hpp" 4 #include "hmbdc/tips/tcpcast/RecvSession.hpp" 5 #include "hmbdc/tips/tcpcast/Messages.hpp" 6 #include "hmbdc/tips/tcpcast/DefaultUserConfig.hpp" 7 #include "hmbdc/tips/udpcast/RecvTransportEngine.hpp" 8 #include "hmbdc/app/Logger.hpp" 10 #include <boost/bind.hpp> 11 #include <boost/lexical_cast.hpp> 12 #include <boost/unordered_map.hpp> 18 namespace hmbdc {
namespace tips {
namespace tcpcast {
25 using Transport::Transport;
29 namespace recvtransportengine_detail {
34 template <
typename OutputBuffer,
typename AttachmentAllocator>
39 ,
Client<RecvTransportEngine<OutputBuffer, AttachmentAllocator>> {
52 , OutputBuffer& outputBuffer)
54 , cfg.resetSection(
"rx", false))))
59 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
60 , outputBuffer_(outputBuffer)
61 , maxItemSize_(outputBuffer.maxItemSize())
62 , myIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
63 config_.getExt<
string>(
"ifaceAddr")).c_str())) {
64 if (!config_.getExt<
bool>(
"allowRecvWithinProcess")) {
69 HMBDC_THROW(std::out_of_range
70 ,
"buffer is too small for notification: SessionStarted and SessionDropped");
72 mcConfig_.put(
"outBufferSizePower2", 3u);
73 mcReceiver_.emplace(mcConfig_, buffer_);
74 mcReceiver_->template subscribe<TypeTagSource>();
78 for (
auto& s : recvSessions_) {
79 s.second->heartbeat();
84 schedule(SysTime::now(), *
this);
88 this->checkTimers(time::SysTime::now());
92 using Transport::hmbdcName;
93 using Transport::schedSpec;
102 HMBDC_LOG_C(e.what());
111 for (
auto it = recvSessions_.begin(); it != recvSessions_.end();) {
112 if (hmbdc_unlikely(!(*it).second->runOnce())) {
113 (*it).second->stop();
114 recvSessions_.erase(it++);
120 auto n = buffer_.peek(begin, end);
125 buffer_.wasteAfterPeek(begin, n);
127 mcReceiver_->runOnce(
true);
135 return recvSessions_.size();
138 template <MessageTupleC Messages,
typename CcNode>
139 void subscribeFor(CcNode
const& node, uint16_t mod, uint16_t res) {
140 subscriptions_.addSubsFor<Messages>(node, mod, res);
147 auto ip = inet_addr(t.ip);
153 if (t.srcPid == myPid_ && ip == myIp_)
return;
154 auto key = make_pair(ip, t.port);
155 if (recvSessions_.find(key) == recvSessions_.end()) {
156 for (
auto i = 0u; i < t.typeTagCountContained; ++i) {
157 if (subscriptions_.check(t.typeTags[i])) {
164 sess->start(ip, t.port);
165 recvSessions_[key] =
typename Session::ptr(sess);
166 }
catch (std::exception
const& e) {
167 HMBDC_LOG_C(e.what());
178 Config config_, mcConfig_;
180 OutputBuffer& outputBuffer_;
182 std::optional<udpcast::RecvTransportImpl<decltype(buffer_)>> mcReceiver_;
185 boost::unordered_map<pair<uint64_t, uint16_t>
186 ,
typename Session::ptr> recvSessions_;
192 template <
typename OutputBuffer,
typename AttachmentAllocator>
193 using RecvTransportEngine = recvtransportengine_detail::RecvTransportEngine<OutputBuffer, AttachmentAllocator>;
Definition: MonoLockFreeBuffer.hpp:16
interface to power a tcpcast transport receiving functions
Definition: RecvTransportEngine.hpp:23
class to hold an hmbdc configuration
Definition: Config.hpp:45
void invokedCb(size_t) HMBDC_RESTRICT override
power the io_service and other things
Definition: RecvTransportEngine.hpp:110
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
void handleMessageCb(TypeTagSource const &t)
only used by MD
Definition: RecvTransportEngine.hpp:146
Definition: Transport.hpp:65
this message tipsears in the receiver's buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:120
RecvTransportEngine(Config cfg, OutputBuffer &outputBuffer)
ctor
Definition: RecvTransportEngine.hpp:51
Definition: RecvSession.hpp:27
Definition: MessageDispacher.hpp:184
void stoppedCb(std::exception const &e) override
should not htipsen ever unless an exception thrown
Definition: RecvTransportEngine.hpp:101
Definition: Message.hpp:212
Definition: Message.hpp:263
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
Definition: RecvTransportEngine.hpp:35
Definition: Timers.hpp:117
this message tipsears in the receiver's buffer indicating a new source is connected ...
Definition: Messages.hpp:106
size_t sessionsRemainingActive() const
check how many other parties are sending to this engine
Definition: RecvTransportEngine.hpp:134
Definition: LockFreeBufferMisc.hpp:89