hmbdc
simplify-high-performance-messaging-programming
 All Classes Namespaces Functions Variables Friends Pages
SendTransportEngine.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/udpcast/Transport.hpp"
4 #include "hmbdc/app/udpcast/Messages.hpp"
5 #include "hmbdc/app/Base.hpp"
6 #include "hmbdc/comm/inet/Endpoint.hpp"
7 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
8 #include "hmbdc/time/Time.hpp"
9 #include "hmbdc/time/Rater.hpp"
10 #include "hmbdc/numeric/BitMath.hpp"
11 
12 #include <regex>
13 #include <memory>
14 #include <iostream>
15 #include <mutex>
16 
17 namespace hmbdc { namespace app { namespace udpcast {
18 
19 namespace sendtransportengine_detail {
20 using namespace std;
21 using namespace hmbdc::time;
22 using namespace hmbdc::pattern;
24 
26 : Transport {
27  using ptr = std::shared_ptr<SendTransport>;
28  SendTransport(Config const&, size_t);
29  ~SendTransport();
30 
31  bool match(Topic const& t) const {
32  return std::regex_match(t.c_str(), topicRegex_);
33  }
34 
35  template <typename... Messages>
36  void queue(Topic const& t, Messages&&... msgs) {
37  auto n = sizeof...(msgs);
38  auto it = buffer_.claim(n);
39  queue(it, t, std::forward<Messages>(msgs)...);
40  buffer_.commit(it, n);
41  }
42 
43  template <typename... Messages>
44  bool tryQueue(Topic const& t, Messages&&... msgs) {
45  auto n = sizeof...(msgs);
46  auto it = buffer_.tryClaim(n);
47  if (it) {
48  queue(it, t, std::forward<Messages>(msgs)...);
49  buffer_.commit(it, n);
50  return true;
51  }
52  return false;
53  }
54 
55  template <typename Message, typename ... Args>
56  void queueInPlace(Topic const& t, Args&&... args) {
57  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
58  auto s = buffer_.claim();
59  char* addr = static_cast<char*>(*s);
60  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
61  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
62  h->topicLen = tl;
63  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
64  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<Args>(args)...);
65  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
66  } else {
67  HMBDC_THROW(std::out_of_range
68  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
69  }
70  buffer_.commit(s);
71  }
72 
73  void queueBytes(Topic const& t, uint16_t tag, void const* bytes, size_t len);
74 
75  void runOnce(bool alwaysPoll = false) HMBDC_RESTRICT {
76  resumeSend();
77  if (hmbdc_unlikely(alwaysPoll || !isFdReady())) {
78  utils::EpollTask::instance().poll();
79  }
80  }
81 
82  template <typename M>
83  static TransportMessageHeader* encode(Topic const& t, void* buf, size_t bufLen, M const& m) {
84  using Message = typename std::decay<M>::type;
85  char* addr = static_cast<char*>(buf);
86  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
87  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
88  h->topicLen = tl;
89  if (hmbdc_likely(sizeof(Message) <= bufLen)) {
90  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(m);
91  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
92  } else {
93  HMBDC_THROW(std::out_of_range, "bufLen too small to hold a message");
94  }
95  return h;
96  }
97 
98  void stop();
99 
100 private:
101  std::string topic_;
102  std::regex topicRegex_;
103  size_t maxMessageSize_;
104  typename Buffer::iterator begin_, it_, end_;
105  Buffer buffer_;
106  Rater rater_;
107  size_t maxSendBatch_;
108 
109  iovec* toSendMsgs_;
110  size_t toSendMsgsHead_;
111  size_t toSendMsgsTail_;
112  mmsghdr* toSendPkts_;
113  size_t toSendPktsHead_;
114  size_t toSendPktsTail_;
115  std::vector<comm::inet::Endpoint> udpcastDests_;
116 
117  uint16_t
118  outBufferSizePower2();
119 
120  template<typename M, typename ... Messages>
121  void queue(typename Buffer::iterator it, Topic const& t
122  , M&& m, Messages&&... msgs) {
123  using Message = typename std::decay<M>::type;
124  static_assert(std::is_trivially_destructible<Message>::value, "cannot send message with dtor");
125  auto s = *it;
126  char* addr = static_cast<char*>(s);
127  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
128  auto tl = t.copyTo(addr + sizeof(TransportMessageHeader));
129  h->topicLen = tl;
130  if (hmbdc_likely(sizeof(Message) <= maxMessageSize_)) {
131  new (addr + sizeof(TransportMessageHeader) + tl) MessageWrap<Message>(std::forward<M>(m));
132  h->messagePayloadLen() = sizeof(MessageWrap<Message>);
133  } else {
134  HMBDC_THROW(std::out_of_range
135  , "maxMessageSize too small to hold a message when constructing SendTransportEngine");
136  }
137  queue(++it, t, std::forward<Messages>(msgs)...);
138  }
139 
140  void queue(typename Buffer::iterator it
141  , Topic const& t) {}
142  void resumeSend();
143 };
144 
147 , Client<SendTransportEngine> {
148  using SendTransport::SendTransport;
149  using SendTransport::hmbdcName;
150  using SendTransport::schedSpec;
151 
152 /**
153  * @brief if the user choose no to have a Context to manage and run the engine
154  * this method can be called from any thread from time to time to have the engine
155  * do its job
156  * @details thread safe and non-blocking, if the engine is already being powered by a Context,
157  * this method has no effect
158  */
159  void rotate() {
160  if (runLock_.try_lock()) {
162  runLock_.unlock();
163  }
164  }
165 
166  /*virtual*/
167  void messageDispatchingStartedCb(uint16_t threadSerialNumber) override {
168  runLock_.lock();
169  }
170 
171  /*virtual*/
172  void invokedCb(uint16_t) HMBDC_RESTRICT override {
173  runOnce();
174  }
175  /*virtual*/ bool droppedCb() override {
176  stop();
177  return true;
178  };
179 
181  this->runLock_.unlock();
182  }
183 
184 private:
185  std::mutex runLock_;
186 };
187 
188 } //sendtransportengine_detail
189 using SendTransport = sendtransportengine_detail::SendTransport;
190 using SendTransportEngine = sendtransportengine_detail::SendTransportEngine;
191 }}}
Definition: MonoLockFreeBuffer.hpp:15
Definition: Transport.hpp:39
class to hold an hmbdc configuration
Definition: Config.hpp:46
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void invokedCb(uint16_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: SendTransportEngine.hpp:172
Definition: Message.hpp:112
void messageDispatchingStartedCb(uint16_t threadSerialNumber) override
called before any messages got dispatched - only once
Definition: SendTransportEngine.hpp:167
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
Definition: Rater.hpp:11
bool droppedCb() override
callback called after the Client is safely taken out of the Context
Definition: SendTransportEngine.hpp:175
void rotate()
if the user choose no to have a Context to manage and run the engine this method can be called from a...
Definition: SendTransportEngine.hpp:159
Definition: LockFreeBufferMisc.hpp:89