hmbdc
simplify-high-performance-messaging-programming
NetPortal.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Message.hpp"
4 #include "hmbdc/pattern/GuardedSingleton.hpp"
5 
6 #include <functional>
7 #include <tuple>
8 
9 namespace hmbdc { namespace app { namespace utils {
10 
11 namespace netportal_detail {
12 template <typename ...RecvMessages>
13 uint32_t itemSizePower2Num() {
14  uint32_t res = 1ul;
15  auto valueTypeSize = max_size_in_tuple<std::tuple<RecvMessages...>>::value
16  + sizeof(uint64_t);
17  while ((1ul << res) < valueTypeSize) res++;
18  return res;
19 }
20 } //detail
21 
22 /**
23  * @brief an utility that gives finer control to the user that do
24  * not rely on hmbdc Contexts
25  * @details still needs to use NetContext singleton which needs
26  * to be created first
27  *
28  * @tparam NetContext - on the hmbdc net transport Context such as
29  * tcpcast::NetContext, ... etc
30  * @tparam SendMessageTuple - std tuple containing the message types to
31  * send out - can be empty
32  * @tparam RecvMessageTuple - std tuple containing the message types to
33  * receive - default to be empty
34  *
35  */
36 template <typename NetContext, typename SendMessageTuple
37  , typename RecvMessageTuple = std::tuple<>>
38 struct NetPortal;
39 
40 template <typename NetContext, MessageC ... SendMessages, MessageC ... RecvMessages>
41 struct NetPortal<NetContext, std::tuple<SendMessages...>, std::tuple<RecvMessages...>> {
42  /**
43  * @brief ctor - only can be called after the NetContext singleton is created
44  * @details it does not go thru the state machine for sender / receiver
45  * hand shake if the netowork transport dictates that
46  * calling rotate several times will make it happen
47  *
48  * @param cfg a json configure that has a tx and / or rx sections
49  * the supported params are the same as individual transport
50  */
52  : outOneBuffer_(netportal_detail::itemSizePower2Num<RecvMessages...>())
53  , sendEng_(sizeof...(SendMessages)?NetContext::instance().createSendTransportEngine(
54  (cfg.put("connKeyCheck", false), cfg.resetSection("tx", false))
55  , max_size_in_tuple<std::tuple<SendMessages...>>::value)
56  : nullptr)
57  , recvEng_(sizeof...(RecvMessages)?NetContext::instance().createRecvTransportEngine(
58  cfg.resetSection("rx", false)
59  , outOneBuffer_)
60  : nullptr)
61  {}
62 
63  /**
64  * @brief run the portal to send and receive messages
65  * @details it does NOT block - it is not garanteed the user
66  * will have sent or received messages. It is expected to be called
67  * multiple times
68  * thread safe
69  */
70  void rotate() {
71  if (hmbdc_likely(recvEng_)) recvEng_->rotate();
72  if (hmbdc_likely(sendEng_)) sendEng_->rotate();
73  }
74 
75  /**
76  * @brief get the incoming messages and dispatch them to
77  * the receiving code
78  * @details it does NOT block - it is not garanteed the user
79  * will have received messages if there is nothing received.
80  * It is expected to be called whenever the user code feels like receiving messages.
81  * This is NOT a thread safe or reentrant function.
82  *
83  * @tparam MesssageRecver a type that has the handleMessageCb()
84  * callbacks - see app::Client's doc for those callbacks
85  * @tparam MesssageRecverInterestsTuple tuple containing the message types
86  * of the receiver can handle; If a messages that the receiver is not interested
87  * is received, it is simply dropped.
88  * @param recver an instance of the receiver
89  * @return the message count that has been dispatched to the recver
90  * in this call
91  */
92  template <typename MesssageRecver
93  , typename MesssageRecverInterestsTuple = std::tuple<RecvMessages...>>
94  size_t pullIn(MesssageRecver&& recver) {
95  outOneBuffer_.dispCount = 0;
96  if (hmbdc_likely(recvEng_)) {
97  outOneBuffer_.postCommit = [&recver](MessageHead& h) {
98  using M = typename std::decay<MesssageRecver>::type;
99  MessageDispacher<M, MesssageRecverInterestsTuple> disp;
100  return disp(recver, h);
101  };
102  recvEng_->rotate();
103  outOneBuffer_.postCommit
104  = [](MessageHead&){return true;}; //clear
105  }
106  return outOneBuffer_.dispCount;
107  }
108 
109  /**
110  * @brief just run the portal part that send messages
111  * @details it does NOT block - it is not garanteed the user
112  * will have sent messages depending on underly network transport.
113  * It is expected to be called multiple times
114  * thread safe
115  *
116  * @details in low traffic cases the messages are always sent out
117  * by single call
118  */
119  void flushOut() {
120  if (hmbdc_likely(sendEng_)) sendEng_->rotate();
121  }
122 
123  /**
124  * @brief get a sender for a topic
125  * @details thread safe but a slow call
126  * - so the user cashing the result is expected
127  *
128  * @param t topic for sending
129  * @return sender pointer. The sender is owned by the lib - do not delete
130  */
131  typename NetContext::Sender* getSender(comm::Topic const& t) {
132  if (sendEng_ && sendEng_->match(t))
133  return NetContext::instance().getSender(t);
134  return nullptr;
135  }
136 
137  /**
138  * @brief get a send engineng
139  * @return sender engine pointer. The sender engine is owned by the lib - do not delete
140  */
141  auto getSendEngine() {
142  return sendEng_;
143  }
144 
145  /**
146  * @brief listen to a topic - has impact on the
147  * pullIn call
148  * @details threadsafe
149  *
150  * @param t topic to listen to
151  */
152  void listenTo(comm::Topic const& t) {
153  if (recvEng_) recvEng_->listenTo(t);
154  }
155 
156  /**
157  * @brief stop listening to a topic - has impact on the
158  * pullIn call
159  * @details threadsafe
160  *
161  * @param t topic to forget
162  */
163  void stopListenTo(comm::Topic const& t) {
164  if (recvEng_) recvEng_->stopListenTo(t);
165  }
166 
167  /**
168  * @brief the sending session that are active
169  * which is how many receiver are active
170  * @details only supported for reliable transport types
171  * such as tcpcast, rmast, and rnemap
172  * @return the count - return numeric_limits<size_t>::max()
173  * when the first receiver is yet to connect (initial)
174  */
176  return sendEng_?sendEng_->sessionsRemainingActive():0;
177  }
178 
179  /**
180  * @brief the recving session that are active
181  * which is how many sending parties are active
182  * @details only supported for reliable transport types
183  * such as tcpcast, rmast, and rnemap
184  * @return the count
185  */
187  return recvEng_?recvEng_->sessionsRemainingActive():0;
188  }
189 
190 private:
191  struct OneBuffer {
193  using value_type = void *;
194  const size_t valueTypeSize;
196  std::function<bool (MessageHead&)> postCommit
197  = [](MessageHead&){return true;};
198  size_t dispCount = 0;
199 
200  OneBuffer(size_t valueTypeSizePower2Num)
201  : valueTypeSize(1ul << valueTypeSizePower2Num)
202  , buffer(valueTypeSizePower2Num, 0) {}
203 
204  size_t capacity() const {return 1;};
205  size_t maxItemSize() const {return valueTypeSize;}
206  void put(void* item, size_t sizeHint = 0) {
207  if (postCommit(*(MessageHead*)(item)))
208  dispCount++;
209  }
210 
211  template <typename T> void put(T& item) {put(&item, sizeof(T));}
212  template <typename T> void putSome(T& item) {put(&item, std::min(sizeof(item), maxItemSize()));}
213  iterator claim() {
214  return iterator(buffer, 0);
215  }
216  void commit(iterator it) {
217  if (postCommit(*(MessageHead*)(*it)))
218  dispCount++;
219  }
220  };
221 
222  OneBuffer outOneBuffer_;
223  using SendTransportEngine = typename NetContext::SendTransportEngine;
224  SendTransportEngine* sendEng_;
225  using RecvTransportEngine = typename NetContext::template RecvTransportEngine<OneBuffer>;
226  RecvTransportEngine* recvEng_;
227 };
228 
229 }}}
230 
class to hold an hmbdc configuration
Definition: Config.hpp:46
NetContext::Sender * getSender(comm::Topic const &t)
get a sender for a topic
Definition: NetPortal.hpp:131
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
Definition: Topic.hpp:44
size_t pullIn(MesssageRecver &&recver)
get the incoming messages and dispatch them to the receiving code
Definition: NetPortal.hpp:94
Definition: Message.hpp:78
void stopListenTo(comm::Topic const &t)
stop listening to a topic - has impact on the pullIn call
Definition: NetPortal.hpp:163
void flushOut()
just run the portal part that send messages
Definition: NetPortal.hpp:119
void listenTo(comm::Topic const &t)
listen to a topic - has impact on the pullIn call
Definition: NetPortal.hpp:152
Definition: MetaUtils.hpp:96
size_t sendSessionsRemainingActive() const
the sending session that are active which is how many receiver are active
Definition: NetPortal.hpp:175
size_t recvSessionsRemainingActive() const
the recving session that are active which is how many sending parties are active
Definition: NetPortal.hpp:186
void rotate()
run the portal to send and receive messages
Definition: NetPortal.hpp:70
an utility that gives finer control to the user that do not rely on hmbdc Contexts ...
Definition: NetPortal.hpp:38
NetPortal(Config cfg)
ctor - only can be called after the NetContext singleton is created
Definition: NetPortal.hpp:51
Definition: Base.hpp:13
Definition: LockFreeBufferMisc.hpp:89