hmbdc
simplify-high-performance-messaging-programming
Messages.hpp
1 
2 #include "hmbdc/Copyright.hpp"
3 #pragma once
4 
5 #include "hmbdc/app/Message.hpp"
6 #include "hmbdc/Config.hpp"
7 #include "hmbdc/Compile.hpp"
8 
9 #include <ostream>
10 #include <string>
11 #include <limits>
12 #include <unistd.h>
13 #include <stdio.h>
14 #include <memory.h>
15 
16 namespace hmbdc { namespace tips { namespace rnetmap {
18  uint8_t flag = 0; //1 - hasAttachment
19  XmitEndian<uint16_t> messagePayloadLen;
20  static size_t maxPayloadSize() {
21  return 1023;
22  }
23 
24  //copy message into the right place in memory
25  template <typename M>
27  copyTo(void* addrIn, M&& m) {
28  using Message = typename std::decay<M>::type;
29  auto addr = (char*)addrIn;
30  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
31  new (addr + sizeof(TransportMessageHeader))
32  app::MessageWrap<Message>(std::forward<M>(m));
33  h->messagePayloadLen = sizeof(app::MessageWrap<Message>);
34  return h;
35  }
36 
38  copyTo(void* addrIn, uint16_t tag, void const* bytes, size_t len) {
39  auto addr = (char*)addrIn;
40  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
41  new (addr + sizeof(TransportMessageHeader)) app::MessageWrap<app::JustBytes>(tag, bytes, len, nullptr);
42  h->messagePayloadLen =
44  return h;
45  }
46 
47  template <typename Message, typename ... Args>
49  copyToInPlace(void* addrIn, Args&&... args) {
50  char* addr = (char*)addrIn;
51  auto h = reinterpret_cast<TransportMessageHeader*>(addr);
52  new (addr + sizeof(TransportMessageHeader)) app::MessageWrap<Message>(std::forward<Args>(args)...);
53  h->messagePayloadLen = sizeof(app::MessageWrap<Message>);
54  return h;
55  }
56 
57  void const* payload() const {
58  return reinterpret_cast<const char*>(this)
59  + sizeof(TransportMessageHeader);
60  }
61 
62  void * payload() {
63  return reinterpret_cast<char*>(this)
64  + sizeof(TransportMessageHeader);
65  }
66 
67  uint16_t typeTag() const {
68  auto h = static_cast<app::MessageHead const*>(payload());
69  return h->typeTag;
70  }
71 
72  template <typename Message>
73  Message& wrapped() {
74  auto wrap = static_cast<app::MessageWrap<Message>*>(payload());
75  return wrap->payload;
76  }
77 
78  template <typename Message>
79  Message const& wrapped() const {
80  auto wrap = static_cast<app::MessageWrap<Message> const *>(payload());
81  return wrap->payload;
82  }
83 
84  size_t wireSize() const {
85  return sizeof(TransportMessageHeader) + messagePayloadLen;
86  }
87 
88  void const* wireBytes() const {
89  return reinterpret_cast<const char*>(this);
90  }
91 
92  void * wireBytes() {
93  return reinterpret_cast<char*>(this);
94  }
95 
96  void const* wireBytesMemorySeg() const {
97  return wrapped<app::MemorySeg>().seg;
98  }
99 
100  size_t wireSizeMemorySeg() const {
101  return wrapped<app::MemorySeg>().len;
102  }
103 
104  HMBDC_SEQ_TYPE getSeq() const {
105  auto res = (HMBDC_SEQ_TYPE)static_cast<app::MessageHead const*>(payload())->scratchpad().seq;
106  if (hmbdc_unlikely(res == 0xfffffffffffful)) return std::numeric_limits<HMBDC_SEQ_TYPE>::max();
107  return res;
108  }
109 
110  void setSeq(HMBDC_SEQ_TYPE seq) {
111  static_cast<app::MessageHead*>(payload())->scratchpad().seq = seq;
112  }
113 
114 } __attribute__((packed));
115 
117 : app::hasTag<553> {
118  TypeTagBackupSource(std::string const& ipIn
119  , uint16_t portIn
120  , time::Duration recvReportDelayIn
121  , bool loopbackIn)
122  : port(portIn)
123  , recvReportDelay(recvReportDelayIn)
124  , loopback(loopbackIn) {
125  srcPid = getpid();
126  snprintf(ip, sizeof(ip), "%s", ipIn.c_str());
127  }
128  char ip[16];
129  XmitEndian<uint16_t> typeTagCountContained{0};
130  XmitEndian<uint16_t> typeTags[64];
131  bool addTypeTag(uint16_t tag) {
132  auto c = uint16_t{typeTagCountContained};
133  if (c < 64) {
134  typeTags[c++] = tag;
135  typeTagCountContained = c;
136  return true;
137  }
138  return false;
139  }
140 
142  time::Duration recvReportDelay; //reporting back the seq expecting every so often
143  bool loopback;
144  uint32_t sendFrom; //to be filled by recv side
145  XmitEndian<uint32_t> srcPid;
146 
147  friend
148  std::ostream& operator << (std::ostream& os, TypeTagBackupSource const & m) {
149  os << "TypeTagBackupSource:" << m.ip << ':' << m.port;
150  for (auto i = 0u; i < m.typeTagCountContained; i++) {
151  os << ' ' << m.typeTags[i];
152  }
153  return os;
154  }
155 };
156 
157 /**
158  * @class SessionStarted
159  * @brief this message appears in the receiver's buffer indicating a new source is connected
160  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
161  */
163 : app::hasTag<554> {
164  char ip[16];
165  friend
166  std::ostream& operator << (std::ostream& os, SessionStarted const & m) {
167  return os << "Session to type tag source started " << m.ip;
168  }
169 };
170 
171 /**
172  * @class SessionDropped
173  * @brief this message appears in the receiver's buffer indicating a previously connected source is dropped
174  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
175  */
177 : app::hasTag<555> {
178  char ip[16];
179  friend
180  std::ostream& operator << (std::ostream& os, SessionDropped const & m) {
181  return os << "Session to tag tag source dropped " << m.ip;
182  }
183 };
184 
185 struct SeqAlert
186 : app::hasTag<556> {
187  HMBDC_SEQ_TYPE expectSeq;
188  friend
189  std::ostream& operator << (std::ostream& os, SeqAlert const & m) {
190  return os << "SeqAlert " << m.expectSeq;
191  }
192 };
193 
194 static_assert(sizeof(SeqAlert) == sizeof(HMBDC_SEQ_TYPE)
195  , "do you have a pack pragma unclosed that influencs the above struct packing unexpectedly?");
196 }}}
Definition: Messages.hpp:185
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:176
each message type has 16 bit tag
Definition: Message.hpp:62
Definition: Message.hpp:212
Definition: Messages.hpp:116
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:162
Definition: Message.hpp:263
Definition: Time.hpp:134
Definition: Base.hpp:12