hmbdc
simplify-high-performance-messaging-programming
Client.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/app/MessageDispacher.hpp"
5 #include "hmbdc/MetaUtils.hpp"
6 #include "hmbdc/Compile.hpp"
7 
8 #include <iostream>
9 #include <stdexcept>
10 #include <tuple>
11 #include <memory>
12 #include <type_traits>
13 
14 namespace hmbdc { namespace app {
15 
16 namespace client_detail {
17 template <size_t MAX_MEMORY_ATTACHMENT>
20  : att(new(underlyingMessage) hasMemoryAttachment){}
21  uint8_t underlyingMessage[MAX_MEMORY_ATTACHMENT];
22  template <MessageC Message>
23  bool cache(InBandHasMemoryAttachment<Message> const& ibma, uint16_t typeTagIn) {
24  static_assert(sizeof(Message) <= MAX_MEMORY_ATTACHMENT, "");
25  if (hmbdc_unlikely(att->attachment)) {
26  HMBDC_THROW(std::logic_error, "previous InBandMemoryAttachment not concluded");
27  }
28  typeTag = typeTagIn;
29 
30  if constexpr(Message::justBytes) {
31  memcpy(underlyingMessage, &ibma.underlyingMessage, sizeof(underlyingMessage));
32  } else {
33  memcpy(underlyingMessage, &ibma.underlyingMessage, sizeof(Message));
34  }
35 
36  if (att->holdShmHandle<Message>()) {
37  if constexpr (has_hmbdcShmRefCount<Message>::value) {
38  // auto addr = malloc(att->len);
39  auto shmAddr = hmbdcShmHandleToAddr(att->shmHandle);
40  // memcpy(addr, shmAddr, att->len);
41  att->attachment = shmAddr;
42  att->clientData[0] = (uint64_t)&hmbdcShmDeallocator;
43  static_assert(sizeof(ibma.underlyingMessage.hmbdcShmRefCount) == sizeof(size_t));
44  att->clientData[1] = (uint64_t)&ibma.underlyingMessage.hmbdcShmRefCount;
45  if constexpr (has_hmbdcShmRefCount<Message>::value) {
46  att->afterConsumedCleanupFunc = [](hasMemoryAttachment* h) {
47  auto hmbdcShmRefCount = (size_t*)h->clientData[1];
48  if (0 == __atomic_sub_fetch(hmbdcShmRefCount, 1, __ATOMIC_RELAXED)) {
49  auto& hmbdcShmDeallocator
50  = *(std::function<void (uint8_t*)>*)h->clientData[0];
51  hmbdcShmDeallocator((uint8_t*)h->attachment);
52  }
53  };
54  }
55  accSize = att->len;
56  } /// no else
57  return true;
58  } else {
59  att->attachment = malloc(att->len);
60  att->afterConsumedCleanupFunc = hasMemoryAttachment::free;
61  accSize = 0;
62  return false;
63  }
64  }
65 
66  bool accumulate(InBandMemorySeg const& ibms, size_t n) {
67  if (accSize < att->len) {
68  auto attBytes = ibms.seg;
69  auto copySize = std::min(n, att->len - accSize);
70  memcpy((char*)att->attachment + accSize, attBytes, copySize);
71  accSize += copySize;
72  return accSize == att->len;
73  }
74  return true;
75  }
76  hasMemoryAttachment * const att;
77  uint16_t typeTag = 0;
78  size_t accSize = 0;
79  std::function<void* (boost::interprocess::managed_shared_memory::handle_t)>
80  hmbdcShmHandleToAddr;
81  std::function<void (uint8_t*)> hmbdcShmDeallocator;
82 };
83 }
84 
85 /**
86  * @class single_thread_powered_client
87  * @brief a trait class, if a Client can only run on a single specific thread in Pool,
88  * derive the Client from it, hmbdc will check to ensure that is the case
89  */
91 
92 /**
93  * @class Client<>
94  * @brief A Client represents a thread of execution/a task. The execution is managed by
95  * a Context.
96  * a Client object could participate in message dispatching as the receiver of specifed
97  * message types
98  * @details message is dispatched thru callbacks in the loose
99  * form of void handleMessageCb(Message const& m) or void handleMessageCb(Message& m).
100  * some callbacks have default implementations.
101  * however, all callbacks are overridable to provide desired effects;
102  * When running in a Context, all callbacks (*Cb methods) for a hmbdc Client instance
103  * are garanteed to be called from a single OS thread, ie. no 2 callabacks of an Client
104  * instance are called concurrently, so the Client programmer can assume a
105  * single thread programming model callback-wise.
106  * The above also holds true for timer firing callbacks when a concrete Client deriving
107  * from TimerManager that manages the timers
108  *
109  * See in example hmbdc.cpp @snippet hmbdc.cpp write a Client
110  *
111  * @tparam CcClient the concrete Client type
112  * @tparam typename ... Messages message types that the Client interested
113  * if the type of JustBytes is declared, use a special callback see below
114  *
115  * @code
116  * void handleJustBytesCb(uint16_t tag, uint8_t* bytes, hasMemoryAttachment* att){...}
117  * @endcode
118  * is expected
119  * See in example hmbdc.cpp @snippet hmbdc.cpp use JustBytes instead of Message types
120  */
121 template <typename CcClient, MessageC ... Messages>
122 struct Client {
123  using Interests = std::tuple<Messages ...>;
124 
125  enum {
126  INTERESTS_SIZE = std::tuple_size<Interests>::value,
127  MAX_MEMORY_ATTACHMENT = max_size_in_tuple<
128  typename hmbdc::filter_in_tuple_by_base<hasMemoryAttachment, Interests>::type
129  >::value,
130  };
131 
132  /**
133  * @brief return the name of thread that runs this client, override if necessary
134  * @details this only used when the Client is running in direct mode
135  * @return thread name - default to be hmbdc0, hmbdc1 ...
136  */
137  char const* hmbdcName() const { return "hmbdc-anonymous"; }
138 
139  /**
140  * @brief an overrideable method.
141  * returns the schedule policy and priority, override if necessary
142  * priority is only used when policy is "SCHED_RR", or "SCHED_FIFO"
143  * @details this is only used when the Client is running in direct mode
144  * supported policy are "SCHED_OTHER"(=nullptr), "SCHED_RR", "SCHED_FIFO"
145  * @return a tuple made of schedule policy and priority, default to be SCHED_OTHER
146  */
147  std::tuple<char const*, int> schedSpec() const {
148 #ifndef _QNX_SOURCE
149  return std::make_tuple<char const*, int>(nullptr, 0);
150 #else
151  return std::make_tuple<char const*, int>(nullptr, 20);
152 #endif
153  }
154 
155  /**
156  * @brief an overridable method.
157  * client receives events in batches and the max batch size is controllable when running
158  * in direct mode Context. Here is to specify the max size.
159  * @details a message could only be reaching one client when using partition Context. In this
160  * case, reduce the size (reduce the greediness) might be useful
161  * @return the max number of messages when grabbing messages to process
162  */
163  size_t maxBatchMessageCount() const {return std::numeric_limits<size_t>::max();}
164 
165 
166  /**
167  * @brief called before any messages got dispatched - only once
168  * @details this is the place some preparation code goes to
169  *
170  * @param pClientDispatchingStarted pointer to the Client count that has started
171  * displatching within its Context; its value could
172  * change if you repeatedly check - might be used to sync with other Clients
173  * within the same Context (particularly IPC Context) at starting up stage of the system.
174  * When doing the sync, just be aware the Client might be running in a pool - do
175  * not hog the pool thread if possible.
176  * Note: Since ipc_creator Context has an implicitly purger Client, this value would be
177  * 1 greater than the user Clients count
178  */
179  virtual void messageDispatchingStartedCb(size_t const* pClientDispatchingStarted) {
180  //default do nothing
181  };
182 
183  /**
184  * @brief callback called when this Client is taken out of message dispatching
185  * @details after this call the Client is still at hook from the Context point of view
186  * (until droppedCb is called), so don't delete this Client yet or add it back to
187  * the Context. any exception thrown here is ignored,
188  *
189  * @param e the exception that caused the Client to be taken out of message dispatching
190  * e could be thrown by the Client itself in a callback function
191  * to voluntarily take itself out
192  */
193  virtual void stoppedCb(std::exception const& e) {
194  // called when this client is stopped, e.what() might have reason
195  std::cerr << "Client " << static_cast<CcClient*>(this)->hmbdcName()
196  << " exited with reason: " << e.what() << std::endl;
197  };
198 
199  /**
200  * @brief callback called after the Client is safely taken out of the Context
201  * @details exception thrown here is ignored and return true is assumed
202  * @return if false, this Client is added back to the Context to process messages
203  * otherwise, no more callback. You could even safely "delete this; return true;"
204  */
205  virtual bool droppedCb() {
206  // called when this client is dropped and free to be deleted
207  return true;
208  };
209 
210  /**
211  * @brief this callback is called all the time (frequently) - the exact timing is
212  * after a batch of messages are dispatched. After this call returns, the previously
213  * dispatched message's addresses are no longer valid, which means if you cache the
214  * event addresses in the previous handleMessageCb()s, you cannot use those after
215  * the return of the next invokeCb function.
216  * @details you can collectively process the messages received/cached so far here, or
217  * do something needs to be done all the time like powering another message loop
218  *
219  * @param dispatched the number of messages dispatched since last invokedCb called
220  */
221  virtual void invokedCb(size_t dispatched) {
222  // called as frequently as workload allowed
223  }
224 
225  /**
226  * @brief trivial
227  */
228  virtual ~Client(){}
229 
230 protected:
231  /**
232  * @brief the derived user's Client has the option to stop the current batch of event
233  * dispatching.
234  * @details for example, if the user's Client decides it has handled all interested
235  * messages and wants to skip the remaining messages within the CURRENT batch, call
236  * this within any callback functions.
237  * use with caution, only when it is absolutely certain that there is NO messages of
238  * ANY type the Client need to care are within the current batch.
239  */
240  void batchDone() {
241  batchDone_ = true;
242  }
243 
244 public:
245  /**
246  * @brief the following are for internal use, don't change or override
247  */
248  void stopped(std::exception const&e) noexcept {
249  try {
250  stoppedCb(e);
251  } catch (...) {}
252  }
253 
254  bool dropped() noexcept {
255  bool res = true;
256  try {
257  res = droppedCb();
258  } catch (...) {}
259  return res;
260  }
261 
262  template <typename Iterator>
263  size_t handleRangeImpl(Iterator it,
264  Iterator end, uint16_t threadId) {
265  CcClient& c = static_cast<CcClient&>(*this);
266  size_t res = 0;
267  for (;hmbdc_likely(!batchDone_ && it != end); ++it) {
268  if (MessageDispacher<CcClient, Interests>()(
269  c, *static_cast<MessageHead*>(*it))) res++;
270  }
271  batchDone_ = false;
272  return res;
273  }
274  typename std::conditional<MAX_MEMORY_ATTACHMENT != 0
275  , client_detail::InBandMemoryAttachmentProcessor<MAX_MEMORY_ATTACHMENT>
276  , std::nullptr_t>::type ibmaProc;
277 
278  uint16_t hmbdcNumber = 0xffff;
279 
280 protected:
281  bool batchDone_ = false; ///not part of API, do not touch
282 };
283 
284 /**
285  * if you do not have Message paramter pack, just have tuple
286  */
287 template <typename CcClient, typename MessageTuple>
289 template <typename CcClient, MessageC ... Messages>
290 struct client_using_tuple<CcClient, std::tuple<Messages...>> {
291  using type = Client<CcClient, Messages...>;
292 };
293 }} //hmbdc::app
virtual void invokedCb(size_t dispatched)
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: Client.hpp:221
char const * hmbdcName() const
return the name of thread that runs this client, override if necessary
Definition: Client.hpp:137
virtual ~Client()
trivial
Definition: Client.hpp:228
boost::interprocess::managed_shared_memory ::handle_t shmHandle
Definition: Message.hpp:169
Definition: TypedString.hpp:84
std::tuple< char const *, int > schedSpec() const
an overrideable method. returns the schedule policy and priority, override if necessary priority is o...
Definition: Client.hpp:147
bool cache(InBandHasMemoryAttachment< Message > const &ibma, uint16_t typeTagIn)
Definition: Client.hpp:23
virtual bool droppedCb()
callback called after the Client is safely taken out of the Context
Definition: Client.hpp:205
void batchDone()
the derived user&#39;s Client has the option to stop the current batch of event dispatching.
Definition: Client.hpp:240
void stopped(std::exception const &e) noexcept
the following are for internal use, don&#39;t change or override
Definition: Client.hpp:248
size_t maxBatchMessageCount() const
an overridable method. client receives events in batches and the max batch size is controllable when ...
Definition: Client.hpp:163
Definition: Client.hpp:288
Definition: Message.hpp:459
virtual void stoppedCb(std::exception const &e)
callback called when this Client is taken out of message dispatching
Definition: Client.hpp:193
a trait class, if a Client can only run on a single specific thread in Pool, derive the Client from i...
Definition: Client.hpp:90
Definition: MetaUtils.hpp:100
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
virtual void messageDispatchingStartedCb(size_t const *pClientDispatchingStarted)
called before any messages got dispatched - only once
Definition: Client.hpp:179
a std tuple holding messages types it can dispatch
Definition: BlockingContext.hpp:206
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Base.hpp:12
uint64_t clientData[2]
byte size of the above
Definition: Message.hpp:183
Definition: Message.hpp:430