hmbdc
simplify-high-performance-messaging-programming
Node.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/tips/TypeTagSet.hpp"
4 #include "hmbdc/app/Client.hpp"
5 #include "hmbdc/app/ClientWithStash.hpp"
6 #include "hmbdc/MetaUtils.hpp"
7 
8 #include <functional>
9 
10 #include <unistd.h>
11 
12 namespace hmbdc { namespace tips {
13 template <typename... Nodes>
15  using type = std::tuple<>;
16 };
17 
18 template <typename Node, typename... Nodes>
19 struct aggregate_recv_msgs<Node, Nodes ...> {
20  using type = typename hmbdc::merge_tuple_unique<
21  typename Node::RecvMessageTuple, typename aggregate_recv_msgs<Nodes ...>::type
22  >::type;
23 };
24 
25 template <typename... Nodes>
27  using type = std::tuple<>;
28 };
29 
30 template <typename Node, typename... Nodes>
31 struct aggregate_send_msgs<Node, Nodes ...> {
32  using type = typename hmbdc::merge_tuple_unique<
33  typename Node::SendMessageTuple, typename aggregate_send_msgs<Nodes ...>::type
34  >::type;
35 };
36 
37 /**
38  * @brief a Node is a thread of execution that can suscribe and receive Messages
39  * @details All messages are received through callback function. All callback functions
40  * are called in this Node thread sequentially, so there is no data protection needs
41  * within a Node from this perspective.
42  * @tparam CcNode The concrete Node type
43  * @tparam RecvMessageTuple The std tuple list all the received Message types.
44  * The matching handleMessageCb for the above type needs to be provided for each type
45  * so this message is handled - othewise cannot not compile. For example:
46  * void handleMessageCb(MessageA const& m){...}
47  * @tparam HasMessageStash - if the Node needs Message reorderring support
48  * See ClientWithStash.hpp
49  */
50 template <typename CcNode, MessageTupleC RecvMessageTuple, bool HasMessageStash = false>
51 struct Node;
52 
53 template <typename CcNode, MessageC ...RecvMessages, bool HasMessageStash>
54 struct Node<CcNode, std::tuple<RecvMessages...>, HasMessageStash>
55 : std::conditional<HasMessageStash
56  , app::ClientWithStash<CcNode, 0, RecvMessages...>
57  , app::Client<CcNode, RecvMessages...>
58 >::type {
59  enum {
60  manual_subscribe = false, /// if defined as true in CcNode, auto subscribe is
61  /// not happening - User manually call
62  /// Domain::subscribeFor() at the right time
63  };
64  /**
65  * @brief Concrete Node needs to specify a list of Messages that would be sent out
66  * if this Node is active to help IPC and network delivering filtering.
67  * If not fully specified the message could be invisible outside of this process.
68  */
69  using SendMessageTuple = std::tuple<>;
70  using RecvMessageTuple = typename app::Client<CcNode, RecvMessages...>::Interests;
71 
72  /**
73  * @brief the thread name used to identify the thread the Node is running on
74  * - only for display purpose
75  *
76  * @return char const* a string - less than 8 char typically
77  */
78  char const* hmbdcName() const {return "node";}
79 
80  /**
81  * @brief unless JustBytes is used in Interests - this is automatic
82  *
83  * @return size_t what is the biggest message this Node is to receive
84  */
85  size_t maxMessageSize() const {
86  using I = typename CcNode::Interests;
87  static_assert(index_in_tuple<app::JustBytes, I>::value == std::tuple_size<I>::value
88  , "need to override this function since JustBytes is used");
90  }
91 
92  /**
93  * @brief What tags are going to published when JustBytes is in the RecvMessageTuple
94  *
95  * @param addTag functor used to addd a tag value (NOT tag offset)
96  */
97  void addJustBytesSubsFor(std::function<void(uint16_t)> addTag) const {
98  using I = typename CcNode::Interests;
99  static_assert(index_in_tuple<app::JustBytes, I>::value == std::tuple_size<I>::value
100  , "need to override this function since JustBytes is used");
101  // addTag(aTag);
102  }
103 
104  /**
105  * @brief What tags are going to published when JustBytes is in the SendMessageTuple
106  *
107  * @param addTag functor used to addd a tag value (NOT tag offset)
108  */
109  void addJustBytesPubsFor(std::function<void(uint16_t)> addTag) const {
110  using I = typename CcNode::Interests;
111  static_assert(index_in_tuple<app::JustBytes, I>::value == std::tuple_size<I>::value
112  , "need to override this function since JustBytes is used");
113  // addTag(aTag);
114  }
115 
116  /**
117  * @brief The concrete node overrides this function for each inTagRange type of Message
118  * to indicate which type tags in the range this Node subscribes for - otherwise each one
119  * in range is subscribed
120  *
121  * @tparam Message the inTagRange Message
122  * @param addOffsetInRange a functor that adds a type tag into the subscription
123  * Note - the functor does not take a type tag - instead it takes the tag offset
124  * within the range
125  */
126  template <MessageC Message>
127  void addTypeTagRangeSubsFor(Message*, std::function<void(uint16_t)> addOffsetInRange) const {
128  for (auto i = 0; i < Message::typeTagRange; ++i) {
129  addOffsetInRange(i);
130  }
131  }
132 
133  /**
134  * @brief The concrete node overrides this function for each inTagRange type of Message
135  * to indicate which type tags in the range this Node subscribes for - otherwise each one
136  * in range is expected to be published
137  *
138  * @tparam Message the inTagRange Message
139  * @param addOffsetInRange a functor that adds a type tag into the subscription
140  * Note - the functor does not take a type tag - instead it takes the tag offset
141  * within the range
142  */
143  template <MessageC Message>
144  void addTypeTagRangePubsFor(Message*, std::function<void(uint16_t)> addOffsetInRange) const {
145  for (auto i = 0; i < Message::typeTagRange; ++i) {
146  addOffsetInRange(i);
147  }
148  }
149 
150  /**
151  * @brief Domain calls this first before handles the subscritions for this Node
152  *
153  */
155  auto& node = *static_cast<CcNode*>(this);
156  subscription.addSubsFor<typename CcNode::Interests>(node, 1, 0);
157  }
158 
159  /**
160  * @brief an overridable method in the concrete Node that
161  * determine if a message should be deliverred to this Node instance
162  * This is called at the sending part before the Node is unblocked
163  * due to this delivery
164  *
165  * @tparam Message decision is for this Message type
166  * @param message The decision on this Message instance
167  * @return true if proceed to deliever it
168  * @return false if not
169  */
170  template<MessageC Message>
171  bool ifDeliver(Message const& message) const {
172  if constexpr(!Message::hasRange) {
173  return true;
174  } else {
175  return subscription.check(message.getTypeTag());
176  }
177  }
178 
179  /**
180  * @brief an overridable method in the concrete Node that
181  * determine if a message should be deliverred to this Node instance's
182  * JustBytes Interests
183  * This is called at the sending part before the Node is unblocked
184  * due to this delivery
185  *
186  * @tparam Message decision is for this Message type
187  * @param tag message tag
188  * @param bytes message address
189  * @return true if proceed to deliever it
190  * @return false if not
191  */
192  bool ifDeliver(uint16_t tag, uint8_t const* bytes) const {
193  return subscription.check(tag);
194  }
195 
196  /**
197  * @brief this callback is called all the time (frequently) - the exact timing is
198  * after a batch of messages are dispatched. After this call returns, the previously
199  * dispatched message's addresses are no longer valid, which means if you cache the
200  * event addresses in the previous handleMessageCb()s, you cannot use those after
201  * the return of the next invokeCb function.
202  * @details you can collectively process the messages received/cached so far here, or
203  * do something needs to be done all the time like powering another message loop
204  *
205  * @param dispatched the number of messages dispatched since last invokedCb called
206  */
207  void invokedCb(size_t dispatched) override {
208  // called as frequently as workload allowed
209  }
210 
211 protected:
212  TypeTagSet subscription;
213 };
214 
215 template <typename SendMessageTupleIn, typename RecvMessageTupleIn>
217 : Node<RegistrationNode<SendMessageTupleIn, RecvMessageTupleIn>, RecvMessageTupleIn> {
218  using SendMessageTuple = SendMessageTupleIn;
219 };
220 }}
Definition: MetaUtils.hpp:84
void addTypeTagRangePubsFor(Message *, std::function< void(uint16_t)> addOffsetInRange) const
The concrete node overrides this function for each inTagRange type of Message to indicate which type ...
Definition: Node.hpp:144
Definition: TypedString.hpp:84
Definition: MetaUtils.hpp:48
Definition: Node.hpp:14
void updateSubscription()
Domain calls this first before handles the subscritions for this Node.
Definition: Node.hpp:154
char const * hmbdcName() const
the thread name used to identify the thread the Node is running on
Definition: Node.hpp:78
bool ifDeliver(uint16_t tag, uint8_t const *bytes) const
an overridable method in the concrete Node that determine if a message should be deliverred to this N...
Definition: Node.hpp:192
void addJustBytesPubsFor(std::function< void(uint16_t)> addTag) const
What tags are going to published when JustBytes is in the SendMessageTuple.
Definition: Node.hpp:109
void addTypeTagRangeSubsFor(Message *, std::function< void(uint16_t)> addOffsetInRange) const
The concrete node overrides this function for each inTagRange type of Message to indicate which type ...
Definition: Node.hpp:127
Definition: Node.hpp:26
std::tuple<> SendMessageTuple
Concrete Node needs to specify a list of Messages that would be sent out if this Node is active to he...
Definition: Node.hpp:69
Definition: TypeTagSet.hpp:141
a Node is a thread of execution that can suscribe and receive Messages
Definition: Node.hpp:51
Definition: MetaUtils.hpp:100
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:122
void addJustBytesSubsFor(std::function< void(uint16_t)> addTag) const
What tags are going to published when JustBytes is in the RecvMessageTuple.
Definition: Node.hpp:97
void invokedCb(size_t dispatched) override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: Node.hpp:207
size_t maxMessageSize() const
unless JustBytes is used in Interests - this is automatic
Definition: Node.hpp:85
a std tuple holding messages types it can dispatch
Definition: BlockingContext.hpp:206
Definition: Base.hpp:12
Definition: Node.hpp:216
bool ifDeliver(Message const &message) const
an overridable method in the concrete Node that determine if a message should be deliverred to this N...
Definition: Node.hpp:171