hmbdc
simplify-high-performance-messaging-programming
Messages.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/time/Time.hpp"
5 #include "hmbdc/app/Message.hpp"
6 #include "hmbdc/Config.hpp"
7 #include "hmbdc/Compile.hpp"
8 
9 #include <ostream>
10 #include <string>
11 #include <limits>
12 
13 #include <netinet/in.h>
14 #include <stdio.h>
15 #include <unistd.h>
16 #include <memory.h>
17 
18 namespace hmbdc { namespace tips { namespace rmcast {
19 
21  uint8_t flag = 0; //1 - hasAttachment
22  XmitEndian<uint16_t> messagePayloadLen;
23  static size_t maxPayloadSize() {
24  return 1023;
25  }
26 
27  void const* payload() const {
28  return reinterpret_cast<const char*>(this)
29  + sizeof(TransportMessageHeader);
30  }
31 
32  void * payload() {
33  return reinterpret_cast<char*>(this)
34  + sizeof(TransportMessageHeader);
35  }
36 
37  app::MessageHead& messageHead() {
38  return *static_cast<app::MessageHead*>(payload());
39  }
40 
41  app::MessageHead const& messageHead() const {
42  return *static_cast<app::MessageHead const*>(payload());
43  }
44 
45  uint16_t typeTag() const {
46  auto h = static_cast<app::MessageHead const*>(payload());
47  return h->typeTag;
48  }
49 
50  template <typename Message>
51  Message& wrapped() {
52  auto wrap = static_cast<app::MessageWrap<Message>*>(payload());
53  return wrap->payload;
54  }
55 
56  template <typename Message>
57  Message const& wrapped() const {
58  auto wrap = static_cast<app::MessageWrap<Message> const *>(payload());
59  return wrap->payload;
60  }
61 
62  size_t wireSize() const {
63  return sizeof(TransportMessageHeader) + messagePayloadLen;
64  }
65 
66  void const* wireBytes() const {
67  return reinterpret_cast<const char*>(this);
68  }
69 
70  void const* wireBytesMemorySeg() const {
71  return wrapped<app::MemorySeg>().seg;
72  }
73 
74  size_t wireSizeMemorySeg() const {
75  return wrapped<app::MemorySeg>().len;
76  }
77 
78  HMBDC_SEQ_TYPE getSeq() const {
79  auto res = (HMBDC_SEQ_TYPE)static_cast<app::MessageHead const*>(payload())->scratchpad().seq;
80  if (hmbdc_unlikely(res == 0xfffffffffffful)) return std::numeric_limits<HMBDC_SEQ_TYPE>::max();
81  return res;
82  }
83 
84  void setSeq(HMBDC_SEQ_TYPE seq) {
85  static_cast<app::MessageHead*>(payload())->scratchpad().seq = seq;
86  }
87 
88 } __attribute__((packed));
89 
91 : app::hasTag<453> {
92  TypeTagBackupSource(std::string const& ipIn
93  , uint16_t portIn
94  , time::Duration recvReportDelayIn
95  , bool loopbackIn
96  )
97  : port(portIn)
98  , recvReportDelay(recvReportDelayIn)
99  , timestamp(time::SysTime::now())
100  , loopback(loopbackIn) {
101  snprintf(ip, sizeof(ip), "%s", ipIn.c_str());
102  srcPid = getpid();
103  }
104  char ip[16];
105  XmitEndian<uint16_t> typeTagCountContained{0};
106  XmitEndian<uint16_t> typeTags[64];
107 
108  bool addTypeTag(uint16_t tag) {
109  auto c = uint16_t{typeTagCountContained};
110  if (c < 64) {
111  typeTags[c++] = tag;
112  typeTagCountContained = c;
113  return true;
114  }
115  return false;
116  }
118  time::Duration recvReportDelay; //reporting back the seq expecting every so often
119  time::SysTime timestamp; //when the source started
120  bool loopback;
121  sockaddr_in sendFrom; //to be filled by recv side
122  XmitEndian<uint32_t> srcPid;
123 
124 
125  friend
126  std::ostream& operator << (std::ostream& os, TypeTagBackupSource const & m) {
127  os << "TypeTagBackupSource:" << m.ip << ':' << m.port;
128  for (auto i = 0u; i < m.typeTagCountContained; i++) {
129  os << ' ' << m.typeTags[i];
130  }
131  return os;
132  }
133 };
134 
135 /**
136  * @class SessionStarted
137  * @brief this message appears in the receiver's buffer indicating a new source is connected
138  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
139  */
141 : app::hasTag<454> {
142  char ip[16];
143  friend
144  std::ostream& operator << (std::ostream& os, SessionStarted const & m) {
145  return os << "Session to topic source started " << m.ip;
146  }
147 };
148 
149 /**
150  * @class SessionDropped
151  * @brief this message appears in the receiver's buffer indicating a previously connected source is dropped
152  * @details only appears on the receiving side, and the receiver buffer is big enough to hold this messages
153  */
155 : app::hasTag<455> {
156  char ip[16];
157  friend
158  std::ostream& operator << (std::ostream& os, SessionDropped const & m) {
159  return os << "Session to topic source dropped " << m.ip;
160  }
161 };
162 
163 struct SeqAlert
164 : app::hasTag<456> {
165  HMBDC_SEQ_TYPE expectSeq;
166  friend
167  std::ostream& operator << (std::ostream& os, SeqAlert const & m) {
168  return os << "SeqAlert " << m.expectSeq;
169  }
170 };
171 
172 static_assert(sizeof(SeqAlert) == sizeof(HMBDC_SEQ_TYPE)
173  , "do you have a pack pragma unclosed that influencs the above struct packing unexpectedly?");
174 }}}
Definition: Messages.hpp:90
each message type has 16 bit tag
Definition: Message.hpp:62
this message appears in the receiver&#39;s buffer indicating a new source is connected ...
Definition: Messages.hpp:140
Definition: Message.hpp:212
Definition: Messages.hpp:163
Definition: Time.hpp:14
Definition: Message.hpp:263
Definition: Time.hpp:134
this message appears in the receiver&#39;s buffer indicating a previously connected source is dropped ...
Definition: Messages.hpp:154
Definition: Base.hpp:12