hmbdc
simplify-high-performance-messaging-programming
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Logger.hpp"
4 #include "hmbdc/app/tcpcast/Transport.hpp"
5 #include "hmbdc/app/tcpcast/SendServer.hpp"
6 #include "hmbdc/app/tcpcast/Messages.hpp"
7 #include "hmbdc/app/udpcast/SendTransportEngine.hpp"
8 #include "hmbdc//MetaUtils.hpp"
9 #include "hmbdc/time/Time.hpp"
10 #include "hmbdc/time/Rater.hpp"
11 #include "hmbdc/numeric/BitMath.hpp"
12 
13 #include <boost/circular_buffer.hpp>
14 #include <memory>
15 #include <tuple>
16 #include <regex>
17 #include <type_traits>
18 #include <mutex>
19 
20 namespace hmbdc { namespace app { namespace tcpcast {
21 
22 namespace send_detail {
23 using namespace hmbdc::time;
24 using namespace std;
25 using pattern::MonoLockFreeBuffer;
26 /**
27  * @brief capture the transportation mechanism
28  *
29  */
31 : Transport {
32  using ptr = std::shared_ptr<SendTransport>;
33  /**
34  * @brief ctor
35  * @param cfg jason specifing the transport - see example, perf-tcpcast.cpp
36  * @param maxMessageSize max messafe size in bytes to be sent
37  */
38  SendTransport(Config const&, size_t);
39  template <typename... Messages>
40  void queue(Topic const& t, Messages&&... msgs) {
41  auto n = sizeof...(msgs);
42  auto it = buffer_.claim(n);
43  queue(it, t, std::forward<Messages>(msgs)...);
44  buffer_.commit(it, n);
45  }
46 
47  template <typename... Messages>
48  bool tryQueue(Topic const& t, Messages&&... msgs) {
49  auto n = sizeof...(msgs);
50  auto it = buffer_.tryClaim(n);
51  if (it) {
52  queue(it, t, std::forward<Messages>(msgs)...);
53  buffer_.commit(it, n);
54  return true;
55  }
56  return false;
57  }
58 
59  template <typename Message, typename ... Args>
60  void queueInPlace(Topic const& t, Args&&... args) {
61  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
62  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
64  , "hasMemoryAttachment has to the first base for Message");
65  auto s = buffer_.claim();
66  char* addr = static_cast<char*>(*s);
67  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
68  h->flag = calculateFlag<Message>();
69  h->messagePayloadLen = sizeof(MessageWrap<Message>);
70  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
71  h->topicLen = tl;
72  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
73  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
74  } else {
75  HMBDC_THROW(std::out_of_range
76  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
77  }
78  buffer_.commit(s);
79  }
80 
81  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len);
82 
83  void stop();
84 
85  bool match(Topic const& t) const {
86  return std::regex_match(t.c_str(), topicRegex_);
87  }
88 
89 private:
90  std::string topic_;
91  std::regex topicRegex_;
92 
93 protected:
94  size_t maxMessageSize_;
95  size_t minRecvToStart_;
96  MonoLockFreeBuffer buffer_;
97 
98  unique_ptr<udpcast::SendTransport> mcSendTransport_;
99  Rater rater_;
100  Config mcConfig_;
101 
102 private:
103 
104  template <typename Message>
105  static
106  uint8_t calculateFlag() {
107  if (is_base_of<hasMemoryAttachment, Message>::value) return hasMemoryAttachment::flag;
108  return 0;
109  }
110 
111  template<typename M, typename ... Messages>
112  void queue(MonoLockFreeBuffer::iterator it, Topic const& t
113  , M&& m, Messages&&... msgs) {
114  using Message = typename std::remove_reference<M>::type;
115  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
116  static_assert(!is_base_of<hasMemoryAttachment, Message>::value
118  , "hasMemoryAttachment has to the first base for Message");
119 
120  auto s = *it;
121  char* addr = static_cast<char*>(s);
122  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
123  h->flag = calculateFlag<Message>();
124  h->messagePayloadLen = sizeof(MessageWrap<Message>);
125  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
126  h->topicLen = tl;
127  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
128  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
129  } else {
130  HMBDC_THROW(std::out_of_range, "maxMessageSize too small to hold a message when constructing SendTransportEngine");
131  }
132  queue(++it, t, std::forward<Messages>(msgs)...);
133  }
134 
135  void queue(MonoLockFreeBuffer::iterator it
136  , Topic const& t) {}
137 };
138 
141 , TimerManager
143 , Client<SendTransportEngine> {
144  SendTransportEngine(Config const&, size_t);
145  using SendTransport::hmbdcName;
146  using SendTransport::schedSpec;
147 
148 /**
149  * @brief if the user choose no to have a Context to manage and run the engine
150  * this method can be called from any thread from time to time to have the engine
151  * do its job
152  * @details thread safe and non-blocking, if the engine is already being powered by a Context,
153  * this method has no effect
154  */
155  void rotate() {
156  if (runLock_.try_lock()) {
157  this->checkTimers(time::SysTime::now());
159  runLock_.unlock();
160  }
161  }
162 
163  /*virtual*/
164  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
165  runLock_.lock();
166  }
167 
168  /*virtual*/
169  void invokedCb(size_t) HMBDC_RESTRICT override {
170  runOnce();
171  mcSendTransport_->runOnce(true);
172  }
173 
174  /*virtual*/
175  bool droppedCb() override {
176  stop();
177  stopped_ = true;
178  return true;
179  };
180 
181  /**
182  * @brief check how many recipient sessions are still active
183  * @return numeric_limits<size_t>::max() if the sending hasn't started due to
184  * minRecvToStart has not been met yet
185  */
186  size_t sessionsRemainingActive() const {
187  if (stopped_) return 0;
188  if (server_ && !minRecvToStart_) {
189  return server_->readySessionCount();
190  }
191  return numeric_limits<size_t>::max();
192  }
193 
195  this->runLock_.unlock();
196  }
197 
198 private:
199  void cullSlow() {
200  auto seq = buffer_.readSeq();
201  if (seq == lastSeq_ && buffer_.isFull()) {
202  server_->killSlowestSession();
203  }
204  lastSeq_ = seq;
205  }
206 
207  void runOnce();
208 
209  size_t mtu_;
210  size_t maxSendBatch_;
211  bool waitForSlowReceivers_;
213  ToSend toSend_;
214  // SendServer *server_;
215  // unique_ptr<SyncSendServer> syncServer_;
216  unique_ptr<SendServer> server_;
217  size_t lastSeq_;
218  bool stopped_;
219  std::mutex runLock_;
220 };
221 } //send_detail
222 
225 }}}
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:169
Definition: MonoLockFreeBuffer.hpp:15
capture the transportation mechanism
Definition: SendTransportEngine.hpp:30
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:155
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:175
Definition: Topic.hpp:44
Definition: Timers.hpp:66
Definition: Transport.hpp:69
Definition: MetaUtils.hpp:36
Definition: SendTransportEngine.hpp:139
size_t sessionsRemainingActive() const
check how many recipient sessions are still active
Definition: SendTransportEngine.hpp:186
Definition: Message.hpp:112
Definition: Rater.hpp:10
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: SendTransportEngine.hpp:164
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Definition: Rater.hpp:11
Definition: Base.hpp:13
Definition: Timers.hpp:113
Definition: LockFreeBufferMisc.hpp:89