hmbdc
simplify-high-performance-messaging-programming
 All Classes Namespaces Functions Variables Friends Pages
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/utils/NetContextUtil.hpp"
4 #include "hmbdc/app/Config.hpp"
5 #include "hmbdc/app/udpcast/Messages.hpp"
6 #include "hmbdc/app/udpcast/Sender.hpp"
7 #include "hmbdc/app/udpcast/SendTransportEngine.hpp"
8 #include "hmbdc/app/udpcast/RecvTransportEngine.hpp"
9 #include "hmbdc/app/udpcast/DefaultUserConfig.hpp"
10 #include "hmbdc/comm/Topic.hpp"
11 #include "hmbdc/pattern/GuardedSingleton.hpp"
12 
13 #include <unordered_map>
14 #include <map>
15 #include <mutex>
16 #include <memory>
17 #include <vector>
18 
19 /**
20  * @namespace hmbdc::app::udpcast
21  * UDP multicast/unicast transport
22  */
23 namespace hmbdc { namespace app { namespace udpcast {
24 
25 /**
26 * @example server-cluster.cpp
27 * @example perf-udpcast.cpp
28 * @example ping-pong-udpcast.cpp
29 * @example udpcast-sniff.cpp
30 */
31 
32 /**
33  * @class NetContext
34  * @brief a singleton that holding udpcast resources
35  * @details it manages transport engines
36  */
37 struct NetContext
40 
41  friend struct hmbdc::pattern::SingletonGuardian<NetContext>;
44 
45  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
47 
48 
49 /**
50  * @brief construct a send transport engine (and remember it within the class)
51  * @details After this, user is responsible to get it started within a hmbdc Context,
52  * or rotated continously.
53  * Don't create the same thing twice
54  *
55  * @param cfgIn config specifing the transport - hmbdc/app/udpcast/DefaultUserConfig.hpp
56  * @param maxMessageSize max messafe size in bytes to be sent
57  * trasnport engines, see ctor of SendTransportEngine for details
58  * @return a pointer to the Engine - don't delete it
59  */
61  , size_t maxMessageSize) {
62  Config dft(DefaultUserConfig, "tx");
63  Config cfg(cfgIn);
64  cfg.setDefaultUserConfig(dft);
65 
66  std::lock_guard<std::mutex> tlock(sendTransportsLock_);
67  auto id = cfg.getExt<std::string>("hmbdcName");
68  if (sendTransports_.find(id) != sendTransports_.end()) {
69  HMBDC_THROW(std::runtime_error
70  , "seemingly recreating the same engine, "
71  "same hmbdcNamed engin already created with hmbdcName=" << id);
72  }
73  auto res = new SendTransportEngine(cfg, maxMessageSize);
74  sendTransports_.emplace(id, SendTransport::ptr(res));
75  return res;
76  }
77 
78 /**
79  * @brief same as above but provide a unified interface - not preferred
80  * @return a pointer to the Engine - don't delete it
81  */
83  , size_t maxMessageSize
84  , std::tuple<> args) {
85  return createSendTransportEngine(cfg, maxMessageSize);
86  }
87 
88 /**
89  * @brief construct a recv transport and remember it
90  * @details After this, user is responsible to get it started within a hmbdc Context,
91  * or rotated continously.
92  *
93  * @param cfgIn jason specifing the transport - see hmbdc/app/udpcast/DefaultUserConfig.hpp
94  * @param buffer buffer that recv messages go, normally the one returned by app::Context::buffer()
95  * @param arb optonally an arbitrator to decide which messages to keep and drop
96  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
97  * it supports either udp packet (BEFORE topic filtering) or
98  * hmbdc message (AFTER topic filtering) level arbitration depending on which one of
99  * int operator()(void* bytes, size_t len) and
100  * int operator()(TransportMessageHeader const* header) presents in the arb passed in
101  * trasnport engines, see ctor of RecvTransportEngineImpl for details
102  * see in example use in udpcast-sniff.cpp @snippet udpcast-sniff.cpp define packet arb
103  * see in example use in server-cluster.cpp @snippet server-cluster.cpp define msg arbitrator
104  * @return a pointer to the Engine
105  */
106  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
108  , Buffer& buffer
109  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
110  Config dft(DefaultUserConfig, "rx");
111  Config cfg(cfgIn);
112  cfg.setDefaultUserConfig(dft);
113 
114  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
115  auto res =
117  , std::forward<MsgArbitrator>(arb));
118  recvTransports_.emplace_back(res);
119  return res;
120  }
121 
122 /**
123  * @brief same as above but to provide a unified interface - not preferred
124  * @details use forward_as_tuple to make the tuple passed in
125  * @return a pointer to the Engine
126  */
127  template <typename Buffer, typename ArgsTuple>
129  Config const& cfg
130  , Buffer& buffer
131  , ArgsTuple&& args) {
132  return createRecvTransportEngine(cfg
133  , buffer
134  , std::get<0>(args)
135  );
136  }
137 
138 /**
139  * @brief get (or create for the first time) a Sender - whose function is to send messages on
140  * its associated Topic
141  * @details this operation typically might be slow, so caching the return value is recommended.
142  *
143  * @param t - the Topic that the Sender is for - default to be "_"
144  */
145  Sender* getSender(Topic const& t = Topic("_")) {
146  std::lock_guard<std::mutex> lock(sendersLock_);
147  auto sender = senders_.find(t);
148  if ( sender != senders_.end()) {
149  return sender->second.get();
150  } else {
151  std::lock_guard<std::mutex> slock(sendTransportsLock_);
152  for (auto& s : sendTransports_) {
153  auto st = s.second;
154  if (st->match(t)) {
155  auto newSender = new Sender(st, t);
156  senders_[t].reset(newSender);
157  return newSender;
158  }
159  }
160  }
161 
162  return nullptr;
163  }
164 
165 /**
166  * @brief This process is interested in a Topic
167  * @details Normally the receiving transport covering this topic
168  * needs to be created - not necessarily running - before calling this
169  *
170  * @param t Topic interested
171  */
172  void listenTo(Topic const& t) {
173  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
174  for (auto r : recvTransports_) {
175  r->listenTo(t);
176  }
177  }
178 
179 /**
180  * @brief undo the subscription
181  *
182  * @param t Topic
183  */
184  void stopListenTo(Topic const& t) {
185  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
186  for (auto r : recvTransports_) {
187  r->stopListenTo(t);
188  }
189  }
190 
191 private:
192 /**
193  * @brief this ctor is to create some commonly used basic engine setup to start with.
194  * This is private and only meant to be used through SingletonGuardian,
195  * see in example chat.cpp
196  * @snippet chat.cpp create NetContext with initial tx/rx capabilities
197  * @details It creates one send transport engine and/or one recv transport engine based on cfgIn.
198  * and run them using a single OS thread indicated by runningUsingThreadIndex
199  * If recv engine is created, this method also automaticallly makes the NetContext listen to
200  * a default topic (listenToTopic).
201  *
202  * @param ctx a Context that manages the send / recv transport engines, and hold the received messages
203  * @param cfgIn optional Config for the send AND recv transport engines
204  * @param sendSec the section name for send transport engine in above cfgIn, special values:
205  * nullptr - create send engine using no section config values
206  * "" - do not create send engine
207  * @param recvSec the section name for recv transport engine in above cfgIn
208  * nullptr - create recv engine using no section config values
209  * "" - do not create recv engine
210  * @param maxMessageSize max message size in bytes to be sent, if 0, uses ctx's maxMessageSize()
211  * @param runningUsingThreadIndex see details above
212  * @tparam CcContext ctx Context type
213  */
214  template <typename CcContext>
215  NetContext(CcContext& ctx
216  , Config::Base cfgIn = Config::Base{}
217  , char const* sendSec = nullptr
218  , char const* recvSec = nullptr
219  , size_t maxMessageSize = 0
220  , uint8_t runningUsingThreadIndex = 0
221  , char const* listenToTopic = "_")
222  : runningCtx(0u, 2ul) {
223  checkEpollTaskInitialization();
224  createMinimumNetContext(
225  *this, runningCtx, ctx, cfgIn, sendSec, recvSec, maxMessageSize, runningUsingThreadIndex, listenToTopic);
226  }
227 
228 /**
229  * @brief this is for users that want finer control of engine creation - from a blank NetContext.
230  * @details it does not do anything that the previous ctor does
231  */
233  checkEpollTaskInitialization();
234  }
235 
236  ~NetContext() {
237  }
238 
239  std::unordered_map<std::string, SendTransport::ptr> sendTransports_;
240  std::mutex sendTransportsLock_;
241 
242  std::vector<RecvTransport::ptr> recvTransports_;
243  std::mutex recvTransportsLock_;
244 
245  std::map<Topic, Sender::ptr> senders_;
246  std::mutex sendersLock_;
248 };
249 
250 }
251 }}
252 
void setDefaultUserConfig(Config const &c)
depracated
Definition: Config.hpp:165
class to hold an hmbdc configuration
Definition: Config.hpp:46
a singleton that holding udpcast resources
Definition: NetContext.hpp:37
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
NetContext()
this is for users that want finer control of engine creation - from a blank NetContext.
Definition: NetContext.hpp:232
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:33
void listenTo(Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:172
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport engine (and remember it within the class)
Definition: NetContext.hpp:60
Definition: NetContextUtil.hpp:10
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:53
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a recv transport and remember it
Definition: NetContext.hpp:107
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:225
fascade class for sending network messages
Definition: Sender.hpp:15
void stopListenTo(Topic const &t)
undo the subscription
Definition: NetContext.hpp:184
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, std::tuple<> args)
same as above but provide a unified interface - not preferred
Definition: NetContext.hpp:82
NetContext(CcContext &ctx, Config::Base cfgIn=Config::Base{}, char const *sendSec=nullptr, char const *recvSec=nullptr, size_t maxMessageSize=0, uint8_t runningUsingThreadIndex=0, char const *listenToTopic="_")
this ctor is to create some commonly used basic engine setup to start with. This is private and only ...
Definition: NetContext.hpp:215
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:128
Sender * getSender(Topic const &t=Topic("_"))
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:145