hmbdc
simplify-high-performance-messaging-programming
 All Classes Namespaces Functions Variables Friends Pages
NetContext.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/utils/NetContextUtil.hpp"
4 #include "hmbdc/app/Config.hpp"
5 #include "hmbdc/app/tcpcast/Messages.hpp"
6 #include "hmbdc/app/tcpcast/Sender.hpp"
7 #include "hmbdc/app/tcpcast/SendTransportEngine.hpp"
8 #include "hmbdc/app/tcpcast/RecvTransportEngine.hpp"
9 #include "hmbdc/app/tcpcast/DefaultUserConfig.hpp"
10 
11 #include "hmbdc/comm/Topic.hpp"
12 #include "hmbdc/pattern/GuardedSingleton.hpp"
13 
14 #include <unordered_map>
15 #include <map>
16 #include <mutex>
17 #include <memory>
18 #include <vector>
19 
20 
21 /**
22  * @namespace hmbdc::app::tcpcast
23  * reliable tcpcast (based on TCP) transport
24  */
25 
26 namespace hmbdc { namespace app { namespace tcpcast {
27 
28 
29 /**
30 * @example chat.cpp
31 * @example perf-tcpcast.cpp
32 * @example rmcast-cp.cpp
33 * @example ping-pong-tcpcast.cpp
34 */
35 
36 /**
37  * @class NetContext
38  * @brief a singleton that holding tcpcast resources
39  * @details it manages transport engines
40  *
41  * see perf-tcpcast.cpp chat.cpp for usage.
42  *
43  */
44 struct NetContext
47 
48  friend struct hmbdc::pattern::SingletonGuardian<NetContext>;
51 
52  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
54 
55 /**
56  * @brief construct a send transport enngine (and remember it)
57  * @details After this, user is responsible to get it started within a hmbdc Context,
58  * or rotated continously.
59  * Don't create the same thing twice.
60  *
61  * @param cfgIn jason specifing the transport - see perf-tcpcast.cpp and hmbdc/app/tcpcast/DefaultUserConfig.hpp
62  * @param maxMessageSize max messafe size in bytes to be sent
63  * hold the message in buffer
64  * @return a pointer to the Engine
65  */
67  Config const& cfgIn
68  , size_t maxMessageSize) {
69  Config dft(DefaultUserConfig, "tx");
70  Config cfg(cfgIn);
71  cfg.setDefaultUserConfig(dft);
72 
73  std::lock_guard<std::mutex> tlock(sendTransportEnginesLock_);
74  auto id = cfg.getExt<std::string>("hmbdcName");
75  if (sendTransports_.find(id) != sendTransports_.end()) {
76  HMBDC_THROW(std::runtime_error
77  , "seemingly recreating the same engine, "
78  "same hmbdcNamed engin already created with hmbdcName=" << id);
79  }
80  auto res = new SendTransportEngine(cfg, maxMessageSize);
81  sendTransports_.emplace(id, SendTransport::ptr(res));
82  return res;
83  }
84 
85 /**
86  * @brief same as above but provide an unified interface - not preferred
87  * @return a pointer to the Engine - don't delete it
88  */
90  , size_t maxMessageSize
91  , std::tuple<size_t> args) {
92  return createSendTransportEngine(cfg, maxMessageSize);
93  }
94 
95 /**
96  * @brief construct a recv transport and remember it
97  * @details After this, user is respoonsible to get it started within a hmbdc Context,
98  * or rotated continously.
99  *
100  * @param cfgIn jason specifing the transport - see perf-tcpcast.cpp and hmbdc/app/tcpcast/DefaultUserConfig.hpp
101  * @param buffer buffer that recv messages go, normally the one returned by app::Context::buffer()
102  * @param arb optonally an arbitrator to decide which messages to keep and drop
103  * if arb is an rvalue, it is passed in value, if an lvalue, passed in as reference;
104  * it supports ONLY hmbdc message level (AFTER topic filtering) arbitration if
105  * int operator()(TransportMessageHeader const* header) presents in the arb passed in.
106  * (NO packet level since it is tcp)
107  * @return a pointer to the Engine
108  */
109  template <typename Buffer, typename MsgArbitrator = RecvTransport::NoOpArb>
111  , Buffer& buffer
112  , MsgArbitrator&& arb = RecvTransport::NoOpArb()) {
113  Config dft(DefaultUserConfig, "rx");
114  Config cfg(cfgIn);
115  cfg.setDefaultUserConfig(dft);
116 
117  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
119  cfg, buffer, std::forward<MsgArbitrator>(arb));
120  recvTransports_.emplace_back(res);
121  return res;
122  }
123 
124 /**
125  * @brief same as above but to provide a unified interface - not preferred
126  * @details use forward_as_tuple to make the tuple passed in
127  * @return a pointer to the Engine
128  */
129  template <typename Buffer, typename ArgsTuple>
131  Config const& cfg
132  , Buffer& buffer
133  , ArgsTuple&& args) {
134  return createRecvTransportEngine(cfg
135  , buffer
136  , std::get<0>(args)
137  );
138  }
139 
140 /**
141  * @brief get (or create for the first time) a Sender - whose function is to send messages on
142  * its associated Topic
143  * @details this operation typically might be slow, so caching the return value is recommended.
144  *
145  * @param t - the Topic that the Sender is for
146  */
148  std::lock_guard<std::mutex> lock(sendersLock_);
149  auto sender = senders_.find(t);
150  if ( sender != senders_.end()) {
151  return sender->second.get();
152  } else {
153  std::lock_guard<std::mutex> slock(sendTransportEnginesLock_);
154  for (auto& s : sendTransports_) {
155  auto st = s.second;
156  if (st->match(t)) {
157  auto newSender = new Sender(st, t);
158  senders_[t].reset(newSender);
159  return newSender;
160  }
161  }
162  }
163 
164  return nullptr;
165  }
166 
167 /**
168  * @brief This process is interested in a Topic
169  * @details Normally the receiving transport covering this topic
170  * needs to be created - not necessarily running - before calling this
171  *
172  * @param t Topic interested
173  */
174  void listenTo(comm::Topic const& t) {
175  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
176  for (auto r : recvTransports_) {
177  r->listenTo(t);
178  }
179  }
180 
181 /**
182  * @brief undo the subscription
183  *
184  * @param t Topic
185  */
186  void stopListenTo(comm::Topic const& t) {
187  std::lock_guard<std::mutex> tlock(recvTransportsLock_);
188  for (auto r : recvTransports_) {
189  r->stopListenTo(t);
190  }
191  }
192 
193 private:
194 /**
195  * @brief this ctor is to create some commonly used basic engine setup to start with.
196  * This is private and only meant to be used through SingletonGuardian,
197  * see in example chat.cpp
198  * @snippet chat.cpp create NetContext with initial tx/rx capabilities
199  * @details It creates one send transport engine and/or one recv transport engine based on cfgIn.
200  * and run them using a single OS thread indicated by runningUsingThreadIndex
201  * If recv engine is created, this method also automaticallly makes the NetContext listen to
202  * a default topic (listenToTopic).
203  *
204  * @param ctx a Context that manages the send / recv transport engines, and hold the received messages
205  * @param cfgIn optional Config for the send AND recv transport engines
206  * @param sendSec the section name for send transport engine in above cfgIn, special values:
207  * nullptr - create send engine using no section config values
208  * "" - do not create send engine
209  * @param recvSec the section name for recv transport engine in above cfgIn
210  * nullptr - create recv engine using no section config values
211  * "" - do not create recv engine
212  * @param maxMessageSize max message size in bytes to be sent, if 0, uses ctx's maxMessageSize()
213  * @param runningUsingThreadIndex see details above
214  * @tparam CcContext ctx Context type
215  */
216  template <typename CcContext>
217  NetContext(CcContext& ctx
218  , Config::Base cfgIn = Config::Base{}
219  , char const* sendSec = nullptr
220  , char const* recvSec = nullptr
221  , size_t maxMessageSize = 0
222  , uint8_t runningUsingThreadIndex = 0
223  , char const* listenToTopic = "_")
224  : runningCtx(0u, 2ul) {
225  checkEpollTaskInitialization();
226  createMinimumNetContext(
227  *this, runningCtx, ctx, cfgIn, sendSec, recvSec, maxMessageSize, runningUsingThreadIndex, listenToTopic);
228  }
229 
230 /**
231  * @brief this is for users that want finer control of engine creation - from a blank NetContext.
232  * @details it does not do anything that the previous ctor does
233  */
235  checkEpollTaskInitialization();
236  }
237 
238  ~NetContext() {
239  }
240 
241 
242  std::unordered_map<std::string, SendTransport::ptr> sendTransports_;
243  std::mutex sendTransportEnginesLock_;
244 
245  std::vector<RecvTransport::ptr> recvTransports_;
246  std::mutex recvTransportsLock_;
247 
248  std::map<comm::Topic, Sender::ptr> senders_;
249  std::mutex sendersLock_;
250  Context<> runningCtx;
251 };
252 
253 } //tcpcast
254 
255 }}
256 
void setDefaultUserConfig(Config const &c)
depracated
Definition: Config.hpp:165
class to hold an hmbdc configuration
Definition: Config.hpp:46
NetContext()
this is for users that want finer control of engine creation - from a blank NetContext.
Definition: NetContext.hpp:234
fascade class for sending network messages
Definition: Sender.hpp:12
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
NetContext(CcContext &ctx, Config::Base cfgIn=Config::Base{}, char const *sendSec=nullptr, char const *recvSec=nullptr, size_t maxMessageSize=0, uint8_t runningUsingThreadIndex=0, char const *listenToTopic="_")
this ctor is to create some commonly used basic engine setup to start with. This is private and only ...
Definition: NetContext.hpp:217
Definition: NetContextUtil.hpp:10
SendTransportEngine * createSendTransportEngine(Config const &cfgIn, size_t maxMessageSize)
construct a send transport enngine (and remember it)
Definition: NetContext.hpp:66
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:53
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
auto createRecvTransportEngine(Config const &cfgIn, Buffer &buffer, MsgArbitrator &&arb=RecvTransport::NoOpArb())
construct a recv transport and remember it
Definition: NetContext.hpp:110
T getExt(const path_type &param, bool throwIfMissing=true) const
get a value from the config
Definition: Config.hpp:225
void listenTo(comm::Topic const &t)
This process is interested in a Topic.
Definition: NetContext.hpp:174
Definition: SendTransportEngine.hpp:139
SendTransportEngine * createSendTransportEngineTuply(Config const &cfg, size_t maxMessageSize, std::tuple< size_t > args)
same as above but provide an unified interface - not preferred
Definition: NetContext.hpp:89
a singleton that holding tcpcast resources
Definition: NetContext.hpp:44
void stopListenTo(comm::Topic const &t)
undo the subscription
Definition: NetContext.hpp:186
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:477
a take all arbitrator (no arbitration at all)
Definition: RecvTransportEngine.hpp:38
auto createRecvTransportEngineTuply(Config const &cfg, Buffer &buffer, ArgsTuple &&args)
same as above but to provide a unified interface - not preferred
Definition: NetContext.hpp:130
Sender * getSender(comm::Topic const &t)
get (or create for the first time) a Sender - whose function is to send messages on its associated To...
Definition: NetContext.hpp:147
impl class
Definition: RecvTransportEngine.hpp:61