hmbdc
simplify-high-performance-messaging-programming
NetPortal.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Message.hpp"
4 #include "hmbdc/pattern/GuardedSingleton.hpp"
5 
6 namespace hmbdc { namespace app { namespace utils {
7 
8 namespace netportal_detail {
9 template <typename MesssageRecver, MessageC... RecvMessages>
10 struct Proxy : Client<Proxy<MesssageRecver, RecvMessages...>, RecvMessages ...> {
11  Proxy(MesssageRecver&& recver)
12  : recver(std::forward<MesssageRecver>(recver)){}
13  MesssageRecver&& recver;
14 
15  template <MessageC Message>
16  void handleMessageCb(Message&& message) {
17  count++;
18  recver.handleMessageCb(std::forward<Message>(message));
19  }
20  size_t count = 0;
21 };
22 } //detail
23 
24 /**
25  * @brief an utility that gives finer control to the user that do
26  * not rely on hmbdc Contexts
27  * @details still needs to use NetContext singleton which needs
28  * to be created first
29  *
30  * @tparam NetContext - on the hmbdc net transport Context such as
31  * tcpcast::NetContext, ... etc
32  * @tparam SendMessageTuple - std tuple containing the message types to
33  * send out - can be empty
34  * @tparam RecvMessageTuple - std tuple containing the message types to
35  * receive - default to be empty
36  *
37  */
38 template <typename NetContext, typename SendMessageTuple
39  , typename RecvMessageTuple = std::tuple<>>
40 struct NetPortal;
41 
42 template <typename NetContext, MessageC ... SendMessages, MessageC ... RecvMessages>
43 struct NetPortal<NetContext, std::tuple<SendMessages...>, std::tuple<RecvMessages...>> {
44  /**
45  * @brief ctor - only can be called after the NetContext singleton is created
46  * @details it does not go thru the state machine for sender / receiver
47  * hand shake if the netowork transport dictates that
48  * calling rotate several times will make it happen
49  *
50  * @param cfg a json configure that has a tx and / or rx sections
51  * the supported params are the same as individual transport
52  * @param messageQueueSizePower2Num - see doc in NetContext
53  */
54  NetPortal(Config cfg, size_t messageQueueSizePower2Num = 20)
55  : ctx_(messageQueueSizePower2Num, 0)
56  , sendEng_(sizeof...(SendMessages)?NetContext::instance().createSendTransportEngine(
57  (cfg.put("connKeyCheck", false), cfg.resetSection("tx", false))
58  , max_size_in_tuple<std::tuple<SendMessages...>>::value)
59  : nullptr)
60  , recvEng_(sizeof...(RecvMessages)?NetContext::instance().createRecvTransportEngine(
61  cfg.resetSection("rx", false)
62  , ctx_.buffer())
63  : nullptr)
64  {}
65 
66  /**
67  * @brief run the portal to send and receive messages
68  * @details it does NOT block - it is not garanteed the user
69  * will have sent or received messages. It is expected to be called
70  * multiple times
71  * thread safe
72  */
73  void rotate() {
74  if (hmbdc_likely(recvEng_)) recvEng_->rotate();
75  if (hmbdc_likely(sendEng_)) sendEng_->rotate();
76  }
77 
78  /**
79  * @brief just run the portal part that receives messages
80  * @details it does NOT block - it is not garanteed the user
81  * will have received messages. It is expected to be called
82  * multiple times
83  * thread safe
84  *
85  * @tparam MesssageRecver a type that has the handleMessageCb()
86  * callbacks - see app::Client's doc for those callbacks
87  * @param recver an instance of the receiver
88  * @return the message count that has been dispatched to the recver
89  * in this call
90  */
91  template <typename MesssageRecver>
92  size_t pullIn(MesssageRecver&& recver) {
93  using Proxy = netportal_detail::Proxy<MesssageRecver, RecvMessages...>;
94  if (hmbdc_likely(recvEng_)) recvEng_->rotate();
95 
96  Proxy proxy(std::forward<MesssageRecver>(recver));
97  ctx_.runClientThreadOnce(0, proxy);
98  return proxy.count;
99  }
100 
101  /**
102  * @brief just run the portal part that send messages
103  * @details it does NOT block - it is not garanteed the user
104  * will have sent messages depending on underly network transport.
105  * It is expected to be called multiple times
106  * thread safe
107  *
108  * @details in low traffic cases the messages are always sent out
109  * by single call
110  */
111  void flushOut() {
112  if (hmbdc_likely(sendEng_)) sendEng_->rotate();
113  }
114 
115  /**
116  * @brief get a sender for a topic
117  * @details thread safe but a slow call
118  * - so the user cashing the result is expected
119  *
120  * @param t topic for sending
121  * @return sender point. The sender is owned by the lib - do not delete
122  */
123  typename NetContext::Sender* getSender(comm::Topic const& t) {
124  if (sendEng_ && sendEng_->match(t))
125  return NetContext::instance().getSender(t);
126  return nullptr;
127  }
128 
129  /**
130  * @brief listen to a topic - has impact on the
131  * pullIn call
132  * @details threadsafe
133  *
134  * @param t topic to listen to
135  */
136  void listenTo(comm::Topic const& t) {
137  if (recvEng_) recvEng_->listenTo(t);
138  }
139 
140  /**
141  * @brief stop listening to a topic - has impact on the
142  * pullIn call
143  * @details threadsafe
144  *
145  * @param t topic to forget
146  */
147  void stopListenTo(comm::Topic const& t) {
148  if (recvEng_) recvEng_->stopListenTo(t);
149  }
150 
151  /**
152  * @brief the sending session that are active
153  * which is how many receiver are active
154  * @details only supported for reliable transport types
155  * such as tcpcast, rmast, and rnemap
156  * @return the count - return numeric_limits<size_t>::max()
157  * when the first receiver is yet to connect (initial)
158  */
160  return sendEng_?sendEng_->sessionsRemainingActive():0;
161  }
162 private:
163  using Ctx
164  = Context<max_size_in_tuple<std::tuple<RecvMessages...>>::value
166  >;
167  Ctx ctx_;
168  using SendTransportEngine = typename NetContext::SendTransportEngine;
169  SendTransportEngine* sendEng_;
170  using RecvTransportEngine = typename NetContext::template RecvTransportEngine<typename Ctx::Buffer>;
171  RecvTransportEngine* recvEng_;
172 };
173 
174 }}}
175 
size_t pullIn(MesssageRecver &&recver)
just run the portal part that receives messages
Definition: NetPortal.hpp:92
class to hold an hmbdc configuration
Definition: Config.hpp:46
NetContext::Sender * getSender(comm::Topic const &t)
get a sender for a topic
Definition: NetPortal.hpp:123
Definition: NetPortal.hpp:10
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Context template parameter inidcating each message is sent to one and only one of the clients within ...
Definition: Context.hpp:77
Definition: TypedString.hpp:76
void stopListenTo(comm::Topic const &t)
stop listening to a topic - has impact on the pullIn call
Definition: NetPortal.hpp:147
void flushOut()
just run the portal part that send messages
Definition: NetPortal.hpp:111
void listenTo(comm::Topic const &t)
listen to a topic - has impact on the pullIn call
Definition: NetPortal.hpp:136
Definition: MetaUtils.hpp:96
size_t sendSessionsRemainingActive() const
the sending session that are active which is how many receiver are active
Definition: NetPortal.hpp:159
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
void rotate()
run the portal to send and receive messages
Definition: NetPortal.hpp:73
an utility that gives finer control to the user that do not rely on hmbdc Contexts ...
Definition: NetPortal.hpp:40
NetPortal(Config cfg, size_t messageQueueSizePower2Num=20)
ctor - only can be called after the NetContext singleton is created
Definition: NetPortal.hpp:54
Definition: Base.hpp:13