1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/tips/rnetmap/Transport.hpp" 4 #include "hmbdc/tips/rnetmap/NmRecvTransport.hpp" 5 #include "hmbdc/tips/rnetmap/BackupRecvSession.hpp" 6 #include "hmbdc/tips/rnetmap/DefaultUserConfig.hpp" 7 #include "hmbdc/tips/reliable/AttBufferAdaptor.hpp" 8 #include "hmbdc/app/Logger.hpp" 9 #include "hmbdc/comm/inet/Hash.hpp" 11 #include <boost/bind.hpp> 12 #include <boost/lexical_cast.hpp> 13 #include <boost/unordered_map.hpp> 17 #include <type_traits> 20 namespace hmbdc {
namespace tips {
namespace rnetmap {
30 namespace recvtransportengine_detail {
36 template <
typename OutputBuffer,
typename AttachmentAllocator>
41 friend struct NetContext;
43 , OutputBuffer& outputBuffer)
46 , config_.getExt<uint16_t>(
"cmdBufferSizePower2"))
47 , outputBuffer_(outputBuffer)
48 , myBackupIp_(inet_addr(hmbdc::comm::inet::getLocalIpMatchMask(
49 config_.getExt<
string>(
"tcpIfaceAddr") ==
string(
"ifaceAddr")
50 ?config_.getExt<
string>(
"ifaceAddr"):config_.getExt<
string>(
"tcpIfaceAddr")
52 , nmRecvTransport_(config_
56 , loopback_(config_.getExt<
bool>(
"loopback")) {
57 subscriptions_.addAll<std::tuple<app::StartMemorySegTrain, app::MemorySeg>>();
58 if (!config_.getExt<
bool>(
"allowRecvWithinProcess")) {
63 void runOnce() HMBDC_RESTRICT {
64 utils::EpollTask::instance().poll();
66 auto n = cmdBuffer_.peek(begin, end);
71 cmdBuffer_.wasteAfterPeek(begin, n);
72 nmRecvTransport_.runOnce();
73 for (
auto it = recvSessions_.begin(); it != recvSessions_.end();) {
74 if (hmbdc_unlikely(!it->second->runOnce())) {
77 ep2SessionDict_.erase(it->second->sendFrom);
78 recvSessions_.erase(it++);
89 auto ip = inet_addr(t.ip);
95 if (t.srcPid == myPid_ && ip == myBackupIp_)
return;
96 auto key = make_pair(ip, t.port);
97 if (recvSessions_.find(key) == recvSessions_.end()) {
98 for (
auto i = 0u; i < t.typeTagCountContained; ++i) {
99 if (subscriptions_.check(t.typeTags[i])) {
101 typename Session::ptr sess {
110 sess->start(ip, t.port);
111 recvSessions_[key] = sess;
112 ep2SessionDict_[t.sendFrom] = sess;
113 schedule(SysTime::now(), *sess);
114 }
catch (std::exception
const& e) {
115 HMBDC_LOG_C(e.what());
128 return recvSessions_.size();
131 template <MessageTupleC Messages,
typename CcNode>
132 void subscribeFor(CcNode
const& node, uint16_t mod, uint16_t res) {
133 subscriptions_.addSubsFor<Messages>(node, mod, res);
140 OutputBuffer& outputBuffer_;
142 uint64_t myBackupIp_;
144 = boost::unordered_map<uint32_t, typename Session::ptr>;
145 Ep2SessionDict ep2SessionDict_;
146 boost::unordered_map<pair<uint64_t, uint16_t>
147 ,
typename Session::ptr> recvSessions_;
153 template <
typename OutputBuffer,
typename AttachmentAllocator>
156 ,
Client<RecvTransportEngine<OutputBuffer, AttachmentAllocator>> {
161 this->checkTimers(time::SysTime::now());
172 HMBDC_LOG_C(e.what());
175 using EngineTransport::hmbdcName;
177 std::tuple<char const*, int> schedSpec()
const {
178 return std::make_tuple(this->schedPolicy_.c_str(), this->schedPriority_);
183 template <
typename OutputBuffer,
typename AttachmentAllocator>
184 using RecvTransportEngine = recvtransportengine_detail::RecvTransportEngine<
185 OutputBuffer, AttachmentAllocator>;
Definition: MonoLockFreeBuffer.hpp:16
Definition: RecvTransportEngine.hpp:154
class to hold an hmbdc configuration
Definition: Config.hpp:45
void setAdditionalFallbackConfig(Config const &c)
set additional defaults
Definition: Config.hpp:154
Definition: TypedString.hpp:84
Definition: RecvTransportEngine.hpp:37
Definition: Timers.hpp:70
Config & resetSection(char const *section, bool sectionExists=true)
change section name
Definition: Config.hpp:177
impl class
Definition: NmRecvTransport.hpp:31
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: RecvTransportEngine.hpp:171
Definition: Transport.hpp:39
void handleMessageCb(TypeTagBackupSource const &t)
only used by MD
Definition: RecvTransportEngine.hpp:88
Definition: MessageDispacher.hpp:184
Definition: Message.hpp:212
Definition: Messages.hpp:116
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: RecvTransportEngine.hpp:166
Definition: Message.hpp:263
Definition: RecvTransportEngine.hpp:21
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
size_t sessionsRemainingActive() const
check how many other parties are sending to this engine
Definition: RecvTransportEngine.hpp:127
Definition: BackupRecvSessionT.hpp:38
Definition: LockFreeBufferMisc.hpp:89