hmbdc
simplify-high-performance-messaging-programming
Message.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/time/Time.hpp"
5 #include "hmbdc/Config.hpp"
6 #include "hmbdc/app/Logger.hpp"
7 #include "hmbdc/Endian.hpp"
8 #include "hmbdc/MetaUtils.hpp"
9 #include "hmbdc/Exception.hpp"
10 
11 #include <boost/interprocess/managed_shared_memory.hpp>
12 #include <utility>
13 #include <ostream>
14 #include <type_traits>
15 #include <bitset>
16 #include <cstddef>
17 #include <memory.h>
18 #include <stdint.h>
19 
20 namespace hmbdc { namespace app {
21 
22 // #if __GNUC_PREREQ(10,0)
23 
24 // template <typename M>
25 // concept MessageC = std::decay<M>::type::typeTag >= 0x0
26 // && std::decay<M>::type::typeTag <= 0xffffu;
27 
28 // namespace message_detail {
29 // template <typename ...M>
30 // constexpr bool is_message_tuple = false;
31 
32 // template <MessageC ...M>
33 // constexpr bool is_message_tuple<std::tuple<M...>> = true;
34 // }
35 
36 // template <typename MT>
37 // concept MessageTupleC = message_detail::is_message_tuple<MT>;
38 
39 // template <typename MIter>
40 // concept MessageForwardIterC = MessageC<decltype(*(MIter{}))>
41 // && requires(MIter it) {++it;};
42 
43 // #else
44 
45 #define MessageC typename
46 #define MessageTupleC typename
47 #define MessageForwardIterC typename
48 
49 // #endif
50 
51 /**
52 * see @example hmbdc.cpp
53 * @example perf-tcpcast.cpp
54 */
55 
56 /**
57  * @class hasTag<>
58  * @brief each message type has 16 bit tag
59  * @tparam tag 16 bit unsigned tag
60  */
61 template <uint16_t tag>
62 struct hasTag {
63  struct tagBase {
64  using type = hasTag<tag>;
65  enum { size = 0 };
66  };
67  enum {
68  typeTag = tag,
69  typeSortIndex = tag,
70  hasRange = 0,
71  justBytes = 0,
72  };
73 
74  friend
75  std::ostream&
76  operator << (std::ostream & os, hasTag const& t) {
77  return os << "hasTag=" << tag;
78  }
79  uint16_t constexpr getTypeTag() const {
80  return typeTag;
81  }
82 };
83 
84 template <uint16_t tagStart, uint16_t rangeSize>
85 struct inTagRange {
86  struct tagBase {
88  enum { size = sizeof(type)};
89  };
90 
91  enum {
92  typeSortIndex = tagStart,
93  hasRange = 1,
94  justBytes = 0,
95  };
96  static constexpr uint16_t typeTagStart = tagStart;
97  static constexpr uint16_t typeTagRange = rangeSize;
98 
99  inTagRange(uint16_t offset)
100  : typeTagOffsetInRange(offset) {
101  if (offset >= typeTagRange) {
102  HMBDC_THROW(std::out_of_range, offset << " too big for typeTagRange=" << typeTagRange);
103  }
104  }
105 
106  uint16_t getTypeTag() const { return typeTagStart + typeTagOffsetInRange; }
107  void resetTypeTagOffsetInRange(uint16_t newOffset) {const_cast<XmitEndian<uint16_t>&>(typeTagOffsetInRange) = newOffset;}
108  XmitEndian<uint16_t> const typeTagOffsetInRange{0};
109  friend
110  std::ostream&
111  operator << (std::ostream & os, inTagRange const& t) {
112  return os << "inTagRange tag=" << t.typeTagStart + t.typeTagOffsetInRange;
113  }
114 };
115 
116 HMBDC_CLASS_HAS_DECLARE(hmbdcShmRefCount);
117 /**
118  * @class hasMemoryAttachment
119  * @brief if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message
120  * with memory attachment, the message needs to be derived from this base - as the FIRST base,
121  * so it can be handled properly by the hmbdc network transport when sending and receiving it.
122  * @details user on the sending side cannot directly free the attachment, instead the user can provide
123  * a callback func and hmbdc will call it - the default callback function is to call free on the attachment
124  */
126  /**
127  * @brief default ctor
128  * @details assuming attachment will be allocated and release thru heap
129  */
131  : afterConsumedCleanupFunc(hasMemoryAttachment::free)
132  , attachment(NULL)
133  , len(0) {
134  }
135  enum {
136  flag = 1,
137  is_att_0cpyshm = 0,
138  };
139 
140  /**
141  * @brief file mapped memory
142  * @param fileName file name to map
143  */
144  hasMemoryAttachment(char const* fileName)
145  : afterConsumedCleanupFunc(nullptr)
146  , attachment(nullptr)
147  , len(0) {
148  map(fileName);
149  }
150 
151  void release() {
152  if (afterConsumedCleanupFunc) {
153  afterConsumedCleanupFunc(this);
154  afterConsumedCleanupFunc = nullptr;
155  attachment = nullptr;
156  }
157  }
158 
159  using AfterConsumedCleanupFunc = void (*)(hasMemoryAttachment*);
160  mutable
161  AfterConsumedCleanupFunc afterConsumedCleanupFunc; /// pointing to a func that handles cleaning up the attachment
162  /// it is automatically set, but user can change it as desire
163  /// when the sending side is done with this message,
164  /// user code should never explicitly call this function
165  union {
166  void* attachment; /// pointer to the attachment bytes - see clientData data member
167  /// usage if those bytes are already managed by shared_ptr mechanism
168  boost::interprocess::managed_shared_memory
169  ::handle_t shmHandle;
170  };
171  template <MessageC Message>
172  bool holdShmHandle() const {
173  if constexpr (has_hmbdcShmRefCount<Message>::value) {
174  return Message::is_att_0cpyshm || len >= 1 * 1024 * 1024;
175  }
176  return false;
177  }
178 
179  mutable
180  XmitEndian<size_t> len; /// byte size of the above
181 
182  mutable
183  uint64_t clientData[2] = {0}; /// client can put in any data - it will be transferred around
184  /// typically to be used to help in afterConsumedCleanupFunc.
185  ///
186  /// for example, when the attached memory is managed by shared_ptr<uint_8[]>:
187  ///
188  /// using SP = std::shared_ptr<uint8_t[]>;
189  /// auto sp = SP(new uint8_t[a.len]);
190  /// MemoryAttachment3 a3;
191  /// a3.len = a.len;
192  /// a3.attachment = sp.get();
193  /// new (a3.clientData) SP{sp};
194  /// a3.afterConsumedCleanupFunc = [](hasMemoryAttachment* h) {
195  /// ((SP*)h->clientData)->~SP();
196  /// };
197 
198  static void unmap(hasMemoryAttachment*);
199  static void free(hasMemoryAttachment*);
200  size_t map(char const* fileName);
201 } __attribute__((aligned (8)));
202 
203 /**
204  * @class MessageHead
205  * @details It is recommneded that a Message type do not have significant dtor defined
206  * because the message could travel through process bundaries or through network
207  * - won't compile if trying to send them.
208  * The only exception is the BlockingContext (licensed package), which does allow
209  * message's dtors and call them properly within a process boundary.
210  * @snippet hmbdc.cpp define a message
211  */
212 struct MessageHead {
213  MessageHead(uint16_t typeTag)
214  : reserved2(0)
215  , reserved(0)
216  , typeTag(typeTag)
217  {}
218  template <typename Message> Message& get();
219  template <typename Message> Message const& get() const;
220  template <typename Message> void setPayload(Message const&);
221  template <typename Message, typename ...Args> void setPayloadInPlace(Args&& ... args);
222  uint32_t reserved2;
223  uint16_t reserved;
224 
225  XmitEndian<uint16_t> typeTag;
226 
227  using Scratchpad =
228  union {
229  // XmitEndian<uint16_t> payloadLen;
230  struct {
231  char forbidden[sizeof(pid_t)];
232  uint8_t flag; /// not usable when inbandUnderlyingTypeTag is used
233  } __attribute__((packed)) desc;
235  union {
236  struct {
237  pid_t from;
238  uint16_t inbandUnderlyingTypeTag : 16;
239  } __attribute__((packed)) hd;
240  uint64_t inbandPayloadLen : 48;
241  } __attribute__((packed)) ipc;
242  uint64_t reserved : 48;
243  uint8_t space[6];
244  } __attribute__((packed));
245  static_assert(sizeof(Scratchpad) == sizeof(reserved2) + sizeof(reserved), "");
246  static_assert(sizeof(Scratchpad::seq) == sizeof(reserved2) + sizeof(reserved), "");
247  Scratchpad& scratchpad() {return *reinterpret_cast<Scratchpad*>(&reserved2);}
248  Scratchpad const& scratchpad() const{return *reinterpret_cast<Scratchpad const*>(&reserved2);}
249  friend
250  std::ostream& operator << (std::ostream& os, MessageHead const & h) {
251  return os << h.scratchpad().reserved << ' ' << h.typeTag;
252  }
253 
254  template <MessageC Message>
255  static MessageHead const& retrieveFrom(Message const& m) {
256  auto addr = (char const*)&m - sizeof(MessageHead);
257  return *(MessageHead const*)addr;
258  }
259 } __attribute__((packed));
260 static_assert(sizeof(MessageHead) == 8, "pakcing issue?");
261 
262 template <typename Message>
264  template <typename ...Args>
265  explicit MessageWrap(Args&& ... args)
266  : MessageHead(0) //delayed
267  , payload(std::forward<Args>(args)...) {
268  // static_assert(std::is_base_of<hasTag<Message::typeTag>, Message>::value
269  // , "not a properly defined Message");
270  if constexpr (std::is_base_of<hasMemoryAttachment, Message>::value) {
271  this->scratchpad().desc.flag = hasMemoryAttachment::flag;
272  }
273  if constexpr (Message::hasRange) {
274  this->typeTag = payload.getTypeTag();
275  } else {
276  this->typeTag = Message::typeTag;
277  }
278  }
279 
280  MessageWrap(MessageWrap const&) = default;
281  MessageWrap(MessageWrap &) = default;
282  MessageWrap(MessageWrap &&) = default;
283  MessageWrap& operator = (MessageWrap const&) = default;
284  MessageWrap& operator = (MessageWrap &&) = default;
285 
286  Message payload;
287 
288  friend
289  std::ostream& operator << (std::ostream& os, MessageWrap<Message> const & w) {
290  return os << static_cast<MessageHead const&>(w) << ' ' << w.payload;
291  }
292 };
293 
294 template <typename Message>
295 Message&
296 MessageHead::get() {
297  return static_cast<MessageWrap<Message>*>(this)->payload;
298 }
299 
300 template <typename Message>
301 Message const&
302 MessageHead::get() const{
303  return static_cast<MessageWrap<Message> const*>(this)->payload;
304 }
305 
306 template <typename Message>
307 void
308 MessageHead::
309 setPayload(Message const& m) {
310  new (&get<Message>()) Message(m);
311  typeTag = Message::typeTag;
312 }
313 
314 template <typename Message, typename ...Args>
315 void
316 MessageHead::
317 setPayloadInPlace(Args&& ... args) {
318  new (&get<Message>()) Message(std::forward<Args>(args)...);
319  typeTag = Message::typeTag;
320 }
321 
322 struct Flush
323 : hasTag<0> {
324 };
325 
326 template <size_t MaxStreamableSize>
327 struct LoggingT
328 : hasTag<3> {
329  char payload[MaxStreamableSize];
330 } __attribute__((__may_alias__, packed));
331 
332 /**
333  * @class JustBytes
334  * @brief A special type of message only used on the receiving side
335  * @details When a Client declare this type among its interested message types, all
336  * messages types (WITHOUT the destructor) will be delievered in
337  * byte form to this client. It is normmaly listed as the last Message type to
338  * in the interested message type list to catch all
339  * See in example hmbdc.cpp @snippet hmbdc.cpp use JustBytes instead of Message types
340  */
341 struct JustBytes
342 : hasTag<4> {
343  enum {
344  justBytes = 1,
345  typeSortIndex = 65535,
346  };
347  uint8_t bytes[1];
348 
349  JustBytes(void const* bytesIn, size_t len) {
350  memcpy(bytes, bytesIn, len);
351  }
352 
353  JustBytes(JustBytes const&) = delete;
354 };
355 
356 struct Trace {
357  Trace()
358  : hmbdc_ctor_ts(time::SysTime::now()){}
359  time::SysTime hmbdc_ctor_ts;
360  time::SysTime hmbdc_net_queued_ts;
361  time::SysTime hmbdc_dispatched_ts;
362 
363  uint8_t tsIndex;
364  friend
365  std::ostream& operator << (std::ostream& os, Trace const & t) {
366  return os << t.hmbdc_ctor_ts << ' ' << t.hmbdc_net_queued_ts << ' ' << t.hmbdc_dispatched_ts;
367  }
368 };
369 
370 template<>
372  MessageWrap(uint16_t tag, void const* bytes, size_t len, hasMemoryAttachment const* att)
373  : MessageHead(tag)
374  , payload(bytes, len) {
375  if (att) {
376  this->scratchpad().desc.flag = hasMemoryAttachment::flag;
377  }
378  }
379  JustBytes payload;
380 
381  friend
382  std::ostream& operator << (std::ostream& os, MessageWrap const & w) {
383  return os << static_cast<MessageHead const&>(w) << " *";
384  }
385 };
386 
387 static_assert(sizeof(MessageWrap<JustBytes>) == sizeof(MessageHead) + 1, "wrong packing");
388 static_assert(sizeof(MessageWrap<Flush>) == sizeof(MessageHead) + sizeof(Flush), "wrong packing");
389 
390 template <MessageC Message>
392  static bool constexpr value = [](){
393  if constexpr (std::is_base_of<hasMemoryAttachment, Message>::value) {
394 #if __GNUC_PREREQ(10,0)
395  return offsetof(Message, hasMemoryAttachment::attachment) == 0;
396 #else
397  return true;
398 #endif
399  } else {
400  return false;
401  }
402  }();
403 };
404 
406 : hasTag<5> {
407  XmitEndian<uint16_t> inbandUnderlyingTypeTag;
409  XmitEndian<size_t> segCount;
410 
411  friend
412  std::ostream& operator << (std::ostream& os, StartMemorySegTrain const & m) {
413  return os << "StartAttachmentTrain " << m.inbandUnderlyingTypeTag
414  << ' ' << ' ' << m.segCount;
415  }
416 } __attribute__((aligned(8)));
417 
418 struct MemorySeg
419 : hasTag<6> { //none of the following reaches the receiving side - only used on sedning side
420  void const * seg;
421  uint16_t len;
422  uint16_t inbandUnderlyingTypeTag{0};
423  friend
424  std::ostream& operator << (std::ostream& os, MemorySeg const & m) {
425  return os << "MemorySeg " << m.seg << ' ' << m.len << ' ' << m.inbandUnderlyingTypeTag;
426  }
427 } __attribute__((packed));
428 
429 template <MessageC Message>
431 : hasTag<7> {
432 
433  template <typename ShmAllocator>
434  void shmConvert(ShmAllocator&& allocator) {
435  hasMemoryAttachment& att = underlyingMessage;
436  assert(att.holdShmHandle<Message>());
437  if constexpr (Message::is_att_0cpyshm) {
438  /// already in shm
439  att.shmHandle = allocator.getHandle(att.attachment);
440  } else {
441  auto shmAddr = allocator.allocate(att.len);
442  memcpy(shmAddr, att.attachment, att.len);
443  att.shmHandle = allocator.getHandle(shmAddr);
444  }
445  }
446 
447  Message underlyingMessage;
448 private:
449  template <MessageC M>
450  friend struct MessageWrap;
451 
452  template <typename ...Args>
453  explicit InBandHasMemoryAttachment(Args&&... args)
454  : underlyingMessage(std::forward<Args>(args)...) {}
455  using M = typename std::decay<Message>::type;
456  static_assert(std::is_trivially_destructible<M>::value, "cannot send message with dtor");
457 } __attribute__((aligned(8)));
458 
460 : hasTag<8> {
461  char seg[1]; //open end
462  friend
463  std::ostream& operator << (std::ostream& os, InBandMemorySeg const & m) {
464  return os << "MemorySegInBand";
465  }
466 } __attribute__((packed));
467 
468 template<MessageC Message>
470  template <typename ...Args>
471  explicit MessageWrap(Args&& ...args)
473  , payload(std::forward<Args>(args)...) {
474  this->scratchpad().ipc.hd.inbandUnderlyingTypeTag
475  = payload.underlyingMessage.getTypeTag();
476  }
478 
479  friend
480  std::ostream& operator << (std::ostream& os
482  return os << static_cast<MessageHead const&>(w) << " *";
483  }
484 };
485 
486 template<>
488  MessageWrap(uint16_t tag, void const* bytes, size_t len, hasMemoryAttachment* att)
490  , payload(bytes, len) {
491  this->scratchpad().ipc.hd.inbandUnderlyingTypeTag = tag;
492  }
494 
495  friend
496  std::ostream& operator << (std::ostream& os
498  return os << static_cast<MessageHead const&>(w) << " *";
499  }
500 };
501 
502 /**
503  * @class LastSystemMessage
504  * @brief hmbdc system messages use tag values less than this one
505  */
507 : hasTag<999> {
508 };
509 
510 template <MessageC Message>
511 constexpr bool typeTagMatch(uint16_t tag) {
512  if constexpr (Message::justBytes) {
513  return tag > LastSystemMessage::typeTag;
514  }
515  if constexpr (Message::hasRange) {
516  return tag >= Message::typeTagStart
517  && tag < Message::typeTagStart + Message::typeTagRange;
518  } else {
519  return tag == Message::typeTag;
520  }
521 }
522 
523 inline
524 size_t
525 hasMemoryAttachment::
526 map(char const* fileName) {
527  using namespace std;
528  int fd;
529  struct stat sb;
530 
531  fd = open(fileName, O_RDONLY);
532  fstat(fd, &sb);
533  len = (size_t)sb.st_size;
534  attachment = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
535  if (attachment == MAP_FAILED) {
536  HMBDC_THROW(std::runtime_error, "map failed for " << fileName);
537  }
538  close(fd);
539  afterConsumedCleanupFunc = hasMemoryAttachment::unmap;
540  return len;
541 }
542 
543 inline
544 void
545 hasMemoryAttachment::
546 free(hasMemoryAttachment* a) {
547  ::free(a->attachment);
548  a->attachment = nullptr;
549 }
550 
551 inline
552 void
555  munmap(a->attachment, a->len);
556  a->attachment = nullptr;
557 }
558 }} //hmbdc::app
559 
Definition: Message.hpp:371
hasMemoryAttachment(char const *fileName)
file mapped memory
Definition: Message.hpp:144
Definition: Message.hpp:63
Definition: Message.hpp:86
boost::interprocess::managed_shared_memory ::handle_t shmHandle
Definition: Message.hpp:169
Definition: TypedString.hpp:84
each message type has 16 bit tag
Definition: Message.hpp:62
hasMemoryAttachment()
default ctor
Definition: Message.hpp:130
Definition: Message.hpp:327
Definition: Message.hpp:85
static void unmap(hasMemoryAttachment *)
Definition: Message.hpp:554
Definition: Message.hpp:322
Definition: Message.hpp:212
A special type of message only used on the receiving side.
Definition: Message.hpp:341
Definition: Message.hpp:405
Definition: Time.hpp:14
Definition: Message.hpp:459
__attribute__((packed)) desc
not usable when inbandUnderlyingTypeTag is used
hmbdc system messages use tag values less than this one
Definition: Message.hpp:506
Definition: Message.hpp:356
Definition: Message.hpp:263
Definition: Message.hpp:418
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Base.hpp:12
uint64_t clientData[2]
byte size of the above
Definition: Message.hpp:183
void shmConvert(ShmAllocator &&allocator)
Definition: Message.hpp:434
Definition: Message.hpp:430