hmbdc
simplify-high-performance-messaging-programming
 All Classes Namespaces Functions Variables Friends Pages
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/utils/NetContextUtil.hpp"
4 #include "hmbdc/app/Config.hpp"
5 #include "hmbdc/app/netmap/Messages.hpp"
6 #include "hmbdc/app/netmap/Sender.hpp"
7 #include "hmbdc/app/netmap/SendTransportEngine.hpp"
8 #include "hmbdc/app/netmap/RecvTransportEngine.hpp"
9 #include "hmbdc/app/netmap/DefaultUserConfig.hpp"
10 
11 #include "hmbdc/comm/Topic.hpp"
12 #include "hmbdc/pattern/GuardedSingleton.hpp"
13 
14 #include <unordered_map>
15 #include <map>
16 #include <mutex>
17 #include <memory>
18 #include <vector>
19 
20 
21 /**
22  * @namespace hmbdc::app::netmap
23  * netmap multicast transport
24  */
25 namespace hmbdc { namespace app { namespace netmap {
26 
27 /**
28 * @example client-server-netmap.cpp
29 * @example perf-netmap.cpp
30 */
31 
32 /**
33  * @class NetContext
34  * @brief a singleton that holding netmap resources
35  * @details it manage transport engines
36  * see perf-netmap.cpp for usage
37  */
38 struct NetContext
41 
42  friend struct hmbdc::pattern::SingletonGuardian<NetContext>;
45 
46  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
48 
49 /**
50  * @brief construct a send transport and remember it
51  * @details After this, user is responsible to get it started within a hmbdc Context,
52  * or rotated continously.
53  * it might take several seconds (configured by nmResetWaitSec=2) to construct the engine
54  * to avoid message losses at the beginning - netmap seems to require that
55  *
56  * @param cfgIn jason specifing the transport - see hmbdc/app/netmap/DefaultUserConfig.hpp
57  * @param maxMessageSize max messafe size in bytes to be sent
58  * @return a pointer to the Engine
59  */
60  SendTransportEngine* createSendTransportEngine(Config const& cfgIn, size_t maxMessageSize) {
61  Config dft(DefaultUserConfig, "tx");
62  Config cfg(cfgIn);
63  cfg.setDefaultUserConfig(dft);
64 
65  std::lock_guard<std::mutex> tlock(sendTransportEnginesLock_);
66  auto id = cfg.getExt<std::string>("hmbdcName");
67  if (sendTransportEngines_.find(id) != sendTransportEngines_.end()) {
68  HMBDC_THROW(std::runtime_error
69  , "seemingly recreating the same engine, "
70  "same hmbdcNamed engin already created with hmbdcName=" << id);
71  }
72  auto res = new SendTransportEngine(cfg, maxMessageSize);
73  sendTransportEngines_.emplace(id, SendTransportEngine::ptr(res));
74  return res;
75  }
76 
77 /**
78  * @brief same as above but provide an unified interface - not preferred
79  * @return a pointer to the Engine - don't delete it
80  */
82  , size_t maxMessageSize
83  , std::tuple<> args) {
84  return createSendTransportEngine(cfg, maxMessageSize);
85  }
86 
87 /**
88  * @brief construct a recv transport and remember it
89  * @details After this, user is respoonsible to get it started within a hmbdc Context,
90  * or rotated continously.
91  *
92  * @param cfgIn jason specifing the transport - see perf-netmap.cpp and hmbdc/app/netmap/DefaultUserConfig.hpp
93  * @param buffer buffer that recv messages go in, normally the one returned by
94  * app::Context::buffer()
95  * @param arb optonally an arbitrator to decide which messages to keep and drop
96  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
97  * it also supports either netmap packet level (BEFORE topic filtering) or
98  * hmbdc message level (AFTER topic filtering) arbitration depending on which one of
99  * int operator()(void* bytes, size_t len) and
100  * int operator()(TransportMessageHeader const* header) presents in the arb passed in
101 
102  * @return a pointer to the Engine
103  */
104  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
106  , Buffer& buffer
107  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
108  Config dft(DefaultUserConfig, "rx");
109  Config cfg(cfgIn);
110  cfg.setDefaultUserConfig(dft);
111 
112  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
113  auto res =
115  , std::forward<MsgArbitrator>(arb));
116  recvTransportEngines_.emplace_back(res);
117  return res;
118  }
119 
120 /**
121  * @brief same as above but to provide a unified interface - not preferred
122  * @return a pointer to the Engine
123  */
124  template <typename Buffer, typename ArgsTuple>
126  Config const& cfg
127  , Buffer& buffer
128  , ArgsTuple&& args) {
129  return createRecvTransportEngine(cfg
130  , buffer
131  , std::get<0>(args)
132  );
133  }
134 
135 /**
136  * @brief get (or create for the first time) a Sender - whose function is to send messages on
137  * its associated Topic
138  * @details this operation typically might be slow, so caching the return value is recommended.
139  *
140  * @param t - the Topic that the Sender is for
141  */
143  std::lock_guard<std::mutex> lock(sendersLock_);
144  auto sender = senders_.find(t);
145  if ( sender != senders_.end()) {
146  return sender->second.get();
147  } else {
148  std::lock_guard<std::mutex> slock(sendTransportEnginesLock_);
149  for (auto& s : sendTransportEngines_) {
150  auto st = s.second;
151  if (st->match(t)) {
152  auto newSender = new Sender(st, t);
153  senders_[t].reset(newSender);
154  return newSender;
155  }
156  }
157  }
158 
159  return nullptr;
160  }
161 
162 /**
163  * @brief This process is interested in a Topic
164  * @details Normally the receiving transport covering this topic
165  * needs to be created - not necessarily running - before calling this
166  *
167  * @param t Topic interested
168  */
169  void listenTo(comm::Topic const& t) {
170  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
171  for (auto r : recvTransportEngines_) {
172  r->listenTo(t);
173  }
174  }
175 
176 /**
177  * @brief undo the subscription
178  *
179  * @param t Topic
180  */
181  void stopListenTo(comm::Topic const& t) {
182  std::lock_guard<std::mutex> tlock(recvTransportEnginesLock_);
183  for (auto r : recvTransportEngines_) {
184  r->stopListenTo(t);
185  }
186  }
187 
188 private:
189 /**
190  * @brief this ctor is to create some commonly used basic engine setup to start with.
191  * This is private and only meant to be used through SingletonGuardian,
192  * see in example chat.cpp
193  * @snippet chat.cpp create NetContext with initial tx/rx capabilities
194  * @details It creates one send transport engine and/or one recv transport engine based on cfgIn.
195  * and run them using a single OS thread indicated by runningUsingThreadIndex
196  * If recv engine is created, this method also automaticallly makes the NetContext listen to
197  * a default topic (listenToTopic).
198  *
199  * @param ctx a Context that manages the send / recv transport engines, and hold the received messages
200  * @param cfgIn optional Config for the send AND recv transport engines
201  * @param sendSec the section name for send transport engine in above cfgIn, special values:
202  * nullptr - create send engine using no section config values
203  * "" - do not create send engine
204  * @param recvSec the section name for recv transport engine in above cfgIn
205  * nullptr - create recv engine using no section config values
206  * "" - do not create recv engine
207  * @param maxMessageSize max message size in bytes to be sent, if 0, uses ctx's maxMessageSize()
208  * @param runningUsingThreadIndex see details above
209  * @tparam CcContext ctx Context type
210  */
211  template <typename CcContext>
212  NetContext(CcContext& ctx
213  , Config::Base cfgIn = Config::Base{}
214  , char const* sendSec = nullptr
215  , char const* recvSec = nullptr
216  , size_t maxMessageSize = 0
217  , uint8_t runningUsingThreadIndex = 0
218  , char const* listenToTopic = "_")
219  : runningCtx(0u, 2ul) {
220  createMinimumNetContext(
221  *this, runningCtx, ctx, cfgIn, sendSec, recvSec, maxMessageSize, runningUsingThreadIndex, listenToTopic);
222  }
223 
224 /**
225  * @brief this is for users that want finer control of engine creation - from a blank NetContext.
226  * @details it does not do anything that the previous ctor does
227  */
229  }
230 
231  ~NetContext() {
232  }
233 
234 
235  std::unordered_map<std::string, SendTransportEngine::ptr> sendTransportEngines_;
236  std::mutex sendTransportEnginesLock_;
237 
238  std::vector<RecvTransport::ptr> recvTransportEngines_;
239  std::mutex recvTransportEnginesLock_;
240 
241  std::map<comm::Topic, Sender::ptr> senders_;
242  uint32_t sendTransportRotationIndex_;
243  std::mutex sendersLock_;
244  Context<> runningCtx;
245 };
246 
247 }
248 
249 }}
250 
void setDefaultUserConfig(Config const &c)
depracated
Definition: Config.hpp:165
class to hold an hmbdc configuration
Definition: Config.hpp:46
NetContext()
this is for users that want finer control of engine creation - from a blank NetContext.
Definition: NetContext.hpp:228
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
fascade class for sending network messages
Definition: Sender.hpp:11
Definition: NetContextUtil.hpp:10
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:53
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:169
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, std::tuple<> args)
same as above but provide an unified interface - not preferred
Definition: NetContext.hpp:81
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:225
NetContext(CcContext &ctx, Config::Base cfgIn=Config::Base{}, char const *sendSec=nullptr, char const *recvSec=nullptr, size_t maxMessageSize=0, uint8_t runningUsingThreadIndex=0, char const *listenToTopic="_")
this ctor is to create some commonly used basic engine setup to start with. This is private and only ...
Definition: NetContext.hpp:212
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:125
power a netmap port sending functions
Definition: SendTransportEngine.hpp:44
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport and remember it
Definition: NetContext.hpp:60
impl class,
Definition: RecvTransportEngine.hpp:70
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:477
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a recv transport and remember it
Definition: NetContext.hpp:105
char const *const DefaultUserConfig
Definition: DefaultUserConfig.hpp:10
a singleton that holding netmap resources
Definition: NetContext.hpp:38
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:50
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:181
Sender * getSender(comm::Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:142