hmbdc
simplify-high-performance-messaging-programming
MessageDispacher.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/app/Message.hpp"
5 #include "hmbdc/app/Logger.hpp"
6 #include "hmbdc/time/Time.hpp"
7 #include "hmbdc/MetaUtils.hpp"
8 
9 namespace hmbdc { namespace app {
10 
11 namespace messagehandler_detail {
12 HMBDC_CLASS_HAS_DECLARE(ibmaProc);
13 HMBDC_CLASS_HAS_DECLARE(hmbdc_dispatched_ts);
14 HMBDC_CLASS_HAS_DECLARE(hmbdcName);
15 HMBDC_CLASS_HAS_DECLARE(hmbdcAvoidIpcFrom);
16 
17 template <typename Message>
18 int compare(uint16_t tag) {
19  if constexpr (Message::hasRange) {
20  if (tag < Message::typeTagStart) return -1;
21  if (tag >= Message::typeTagStart + Message::typeTagRange) return 1;
22  return 0;
23  } else {
24  if (tag < Message::typeTag) return -1;
25  if (tag > Message::typeTag) return 1;
26  return 0;
27  }
28 }
29 
30 template <typename CcClient, typename Message>
31 struct dispatcher {
32  bool operator()(CcClient& c, MessageHead& m) {
33  if (typeTagMatch<Message>((uint16_t)m.typeTag)) {
34  if constexpr (has_hmbdcName<CcClient>::value) {
35  HMBDC_LOG_r(c.hmbdcName(), " (", this, ") <=== ", m.get<Message>());
36  }
37  if constexpr (has_hmbdc_dispatched_ts<Message>::value) {
38  m.get<Message>().hmbdc_dispatched_ts = hmbdc::time::SysTime::now();
39  }
40  auto& msg = m.get<Message>();
41  if constexpr (has_hmbdcAvoidIpcFrom<CcClient>::value) {
42  if (hmbdc_likely(c.hmbdcAvoidIpcFrom != m.scratchpad().ipc.hd.from)) {
43  if constexpr (Message::justBytes) {
44  c.handleJustBytesCb(m.typeTag, msg.bytes
45  , m.scratchpad().desc.flag? (hasMemoryAttachment*)msg.bytes : nullptr);
46  } else {
47  c.handleMessageCb(msg);
48  }
49  }
50  } else {
51  if constexpr (Message::justBytes) {
52  c.handleJustBytesCb(m.typeTag, msg.bytes
53  , m.scratchpad().desc.flag? (hasMemoryAttachment*)msg.bytes : nullptr);
54  } else {
55  c.handleMessageCb(msg);
56  }
57  }
58  using Wrap = MessageWrap<Message>;
59  auto& wrap = static_cast<Wrap&>(m);
60  wrap.~Wrap();
61  return true;
62  } else if constexpr((std::is_base_of<hasMemoryAttachment, Message>::value
63  || Message::justBytes) && has_ibmaProc<CcClient>::value) {
64  if constexpr(std::is_same<std::nullptr_t, decltype(c.ibmaProc)>::value) {
65  return false;
66  } else {
67  auto& ibmaproc = c.ibmaProc;
68  if (ibmaproc.att->attachment) {
69  if (hmbdc_unlikely(m.typeTag != InBandMemorySeg::typeTag)) return false;
70  if (ibmaproc.accumulate(m.get<InBandMemorySeg>(), m.scratchpad().ipc.inbandPayloadLen)) {
71  if (typeTagMatch<Message>(ibmaproc.typeTag)) {
72  auto* msg = reinterpret_cast<Message*>(ibmaproc.underlyingMessage); (void)msg;
73  if constexpr (has_hmbdcName<CcClient>::value) {
74  HMBDC_LOG_r(c.hmbdcName(), " (", this, ") <=== ", *msg);
75  }
76  if constexpr (has_hmbdc_dispatched_ts<Message>::value) {
77  msg->hmbdc_dispatched_ts = hmbdc::time::SysTime::now();
78  }
79 
80  if constexpr (Message::justBytes) {
81  c.handleJustBytesCb(ibmaproc.typeTag, ibmaproc.underlyingMessage, ibmaproc.att);
82  ibmaproc.att->release();
83  } else {
84  c.handleMessageCb(*msg);
85  msg->release();
86  }
87  // ibmaproc.att->attachment = nullptr;
88  return true;
89  } else {
90  return false;
91  }
92  } else {
93  return true;
94  }
95  } else if (m.typeTag == InBandHasMemoryAttachment<void*>::typeTag) {
96  auto underlyTag = m.scratchpad().ipc.hd.inbandUnderlyingTypeTag;
97  if (typeTagMatch<Message>((uint16_t)underlyTag)) {
98  auto& ibma = m.get<InBandHasMemoryAttachment<Message>>();
99  if constexpr (has_hmbdcAvoidIpcFrom<CcClient>::value) {
100  if (hmbdc_likely(c.hmbdcAvoidIpcFrom == m.scratchpad().ipc.hd.from)) {
101  return true; // skip the rest
102  }
103  }
104 
105  if (ibmaproc.cache(ibma, underlyTag)) {
106  auto* msg = reinterpret_cast<Message*>(ibmaproc.underlyingMessage);(void)msg;
107  if constexpr (has_hmbdcName<CcClient>::value) {
108  HMBDC_LOG_r(c.hmbdcName(), " (", this, ") <=== ", *msg);
109  }
110  if constexpr (has_hmbdc_dispatched_ts<Message>::value) {
111  msg->hmbdc_dispatched_ts = hmbdc::time::SysTime::now();
112  }
113 
114  if constexpr (Message::justBytes) {
115  c.handleJustBytesCb(ibmaproc.typeTag, ibmaproc.underlyingMessage, ibmaproc.att);
116  ibmaproc.att->release();
117  } else {
118  c.handleMessageCb(*msg);
119  msg->release();
120  }
121  }
122  return true;
123  }
124  return false;
125  } //return false;
126  }
127  }
128  return false;
129  }
130 };
131 
132 template<class T, class U>
133 struct ascending_by_tag
134 : std::integral_constant<bool, T::typeSortIndex < U::typeSortIndex>
135 {};
136 
137 template <typename CcClient, typename>
138 struct SeqMessageDispacher {
139  bool operator()(CcClient&, MessageHead&) const{return false;}
140 };
141 
142 template <typename CcClient, typename M, typename ... Messages>
143 struct SeqMessageDispacher<CcClient, std::tuple<M, Messages...>> {
144  bool operator()(CcClient& c, MessageHead& e) const {
145  if (!dispatcher<CcClient, M>()(c, e))
146  return SeqMessageDispacher<CcClient, std::tuple<Messages ...>>()(c, e);
147  else
148  return true;
149  }
150 };
151 
152 template <typename CcClient, typename>
153 struct SortedMessageDispacher {
154  bool operator()(CcClient&, MessageHead&) const;
155 };
156 
157 template <typename CcClient, typename ... Messages>
158 struct SortedMessageDispacher<CcClient, std::tuple<Messages...>> {
159  bool operator()(CcClient& c, MessageHead& e) const {
160  if constexpr (sizeof...(Messages) > 16) {
161  using SortedTupleMessages
162  = typename sort_tuple<messagehandler_detail::ascending_by_tag
163  , std::tuple<Messages...>>::type;
164 
166  [&c, &e](auto* t) {
167  using M = typename std::remove_reference<decltype(*t)>::type;
169  return 0;
170  }
171  return messagehandler_detail::compare<M>(e.typeTag);
172  }
173  );
174  } else {
175  return messagehandler_detail::SeqMessageDispacher<
176  CcClient, std::tuple<Messages ...>>()(c, e);
177  }
178  }
179 };
180 
181 }
182 
183 template <typename CcClient, typename MessageTuple>
185  /// place the JustBytes and its derived the last
186  using SortedTupleMessages
188  bool operator()(CcClient& c, MessageHead& e) const {
189  return messagehandler_detail::SortedMessageDispacher<CcClient, SortedTupleMessages>()(c, e);
190  }
191 };
192 
193 }}
Definition: MessageDispacher.hpp:31
Definition: MetaUtils.hpp:156
Definition: MetaUtils.hpp:139
typename sort_tuple< messagehandler_detail::ascending_by_tag, MessageTuple >::type SortedTupleMessages
place the JustBytes and its derived the last
Definition: MessageDispacher.hpp:187
Definition: MessageDispacher.hpp:184
Definition: Message.hpp:212
Definition: Message.hpp:459
Definition: Message.hpp:263
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:125
Definition: Base.hpp:12
Definition: Message.hpp:430