hmbdc
simplify-high-performance-messaging-programming
 All Classes Namespaces Functions Variables Friends Pages
ContextDetail.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/app/Client.hpp"
5 #include "hmbdc/app/Message.hpp"
6 #include "hmbdc/comm/Topic.hpp"
7 
8 #include "hmbdc/pattern/PoolT.hpp"
9 #include "hmbdc/pattern/PoolMinus.hpp"
10 #include "hmbdc/pattern/LockFreeBufferT.hpp"
11 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
12 
13 #include "hmbdc/os/Thread.hpp"
14 #include "hmbdc/os/Allocators.hpp"
15 #include "hmbdc/time/Timers.hpp"
16 
17 #include <vector>
18 #include <type_traits>
19 #include <thread>
20 #include <utility>
21 
22 namespace hmbdc { namespace app { namespace context_detail {
23 using namespace std;
24 using namespace hmbdc::pattern;
25 
26 template <typename CcClient>
29  template <typename U = CcClient>
30  PoolConsumerProxy(U& client
31  , typename std::enable_if<std::is_base_of<time::TimerManager, U>::value>::type* = nullptr)
32  : pattern::PoolConsumer(CcClient::INTERESTS_SIZE != 0, &client)
33  , client_(client){}
34 
35  template <typename U = CcClient>
36  PoolConsumerProxy(U& client
37  , typename std::enable_if<!std::is_base_of<time::TimerManager, U>::value>::type* = nullptr)
38  : pattern::PoolConsumer(CcClient::INTERESTS_SIZE != 0, nullptr)
39  , client_(client){}
40 
41  virtual void handleRangeImpl(BufIt it,
42  BufIt end, uint16_t threadSerialNumber) override {
43  client_.CcClient::handleRangeImpl(it, end, threadSerialNumber);
44  }
45  virtual void messageDispatchingStartedCb(uint16_t threadId) override {
46  client_.CcClient::messageDispatchingStartedCb(threadId);
47  }
48  virtual void invokedCb(uint16_t threadId) override {
49  client_.CcClient::invokedCb(threadId);
50  }
51  virtual void stoppedCb(std::exception const&e) override {
52  client_.CcClient::stoppedCb(e);
53  }
54  virtual bool droppedCb() override {
55  if (client_.CcClient::droppedCb()) {
56  delete this;
57  return true;
58  } else {
59  return false;
60  }
61  }
62 private:
63  CcClient& HMBDC_RESTRICT client_;
64 };
65 
66 template <typename... ContextProperties>
70  enum {
71  broadcast_msg = 1,
72  has_pool = 1,
73  pool_msgless = 0,
74  create_ipc = 0,
75  attach_ipc = 0,
76  dispatch_reverse = 0,
77  pci_ipc = 0,
78  };
79 };
80 
81 template <uint16_t c, typename... ContextProperties>
82 struct context_property_aggregator<context_property::broadcast<c>
83  , ContextProperties...>
84 : context_property_aggregator<ContextProperties...> {
86  static_assert(context_property_aggregator<ContextProperties...>::broadcast_msg, "contradicting properties");
87  enum {
88  broadcast_msg = 1,
89  has_pool = 1,
90  };
91 };
92 
93 template <typename... ContextProperties>
94 struct context_property_aggregator<context_property::partition
95  , ContextProperties...>
96 : context_property_aggregator<ContextProperties...> {
98  enum {
99  broadcast_msg = 0,
100  has_pool = context_property_aggregator<ContextProperties...>::pool_msgless,
101  };
102 };
103 
104 template <typename... ContextProperties>
105 struct context_property_aggregator<context_property::msgless_pool
106  , ContextProperties...>
107 : context_property_aggregator<ContextProperties...> {
108  enum {
109  has_pool = 1,
110  pool_msgless = 1
111  };
112 };
113 
114 template <typename... ContextProperties>
115 struct context_property_aggregator<context_property::ipc_creator
116  , ContextProperties...>
117 : context_property_aggregator<ContextProperties...> {
119  enum {
120  create_ipc = 1,
121  };
122 };
123 
124 template <typename... ContextProperties>
125 struct context_property_aggregator<context_property::ipc_attacher
126  , ContextProperties...>
127 : context_property_aggregator<ContextProperties...> {
129  enum {
130  attach_ipc = 1,
131  };
132 };
133 
134 template <typename... ContextProperties>
135 struct context_property_aggregator<context_property::pci_ipc
136  , ContextProperties...>
137 : context_property_aggregator<ContextProperties...> {
139  enum {
140  pci_ipc = 1,
141  };
142 };
143 
144 using namespace std;
145 using namespace hmbdc::time;
146 
147 template <bool is_timer_manager>
148 struct tm_runner {
149  template<typename C>
150  void operator()(C&) {}
151 };
152 
153 template <>
154 struct tm_runner<true> {
155  void operator()(TimerManager& tm) {
156  tm.checkTimers(SysTime::now());
157  }
158 };
159 
160 template <typename LFB, typename CcClient>
161 bool runOnceImpl(uint16_t hmbdcNumber, bool& HMBDC_RESTRICT stopped
162  , LFB& HMBDC_RESTRICT lfb, CcClient& HMBDC_RESTRICT c) {
163  typename LFB::iterator begin, end;
164  try {
166  tr(c);
167 
168  using Cc = typename std::decay<CcClient>::type;
169 
170  const bool clientParticipateInMessaging = Cc::INTERESTS_SIZE != 0;
171  if (clientParticipateInMessaging) {
172  uint64_t count = lfb.peek(hmbdcNumber, begin, end, c.maxBatchMessageCount());
173  c.Cc::handleRangeImpl(begin, end, hmbdcNumber);
174  c.Cc::invokedCb(hmbdcNumber);
175  lfb.wasteAfterPeek(hmbdcNumber, count);
176  } else {
177  c.Cc::invokedCb(0xffffu - hmbdcNumber);
178  }
179  } catch (std::exception const& e) {
180  if (!stopped) {
181  c.stopped(e);
182  return !c.dropped();
183  }
184  } catch (int code) {
185  if (!stopped) {
186  c.stopped(hmbdc::ExitCode(code));
187  return !c.dropped();
188  }
189  } catch (...) {
190  if (!stopped) {
191  c.stopped(hmbdc::UnknownException());
192  return !c.dropped();
193  }
194  }
195  return true;
196 }
197 
198 template <typename CcClient>
199 bool runOnceImpl(uint16_t threadSerialNumber, bool& HMBDC_RESTRICT stopped
200  , hmbdc::pattern::MonoLockFreeBuffer& HMBDC_RESTRICT lfb, CcClient& HMBDC_RESTRICT c) {
202  try {
203  tm_runner<std::is_base_of<TimerManager, CcClient>::value> tr;
204  tr(c);
205 
206  using Cc = typename std::decay<CcClient>::type;
207  const bool clientParticipateInMessaging = Cc::INTERESTS_SIZE != 0;
208  if (clientParticipateInMessaging) {
209  uint64_t count = lfb.peek(begin, end, c.maxBatchMessageCount());
210  c.Cc::handleRangeImpl(begin, end, threadSerialNumber);
211  c.Cc::invokedCb(threadSerialNumber);
212  lfb.wasteAfterPeek(begin, count);
213  } else {
214  c.Cc::invokedCb(0xffffu - threadSerialNumber);
215  }
216  } catch (std::exception const& e) {
217  if (!stopped) {
218  c.stopped(e);
219  return !c.dropped();
220  }
221  } catch (int code) {
222  if (!stopped) {
223  c.stopped(hmbdc::ExitCode(code));
224  return !c.dropped();
225  }
226  } catch (...) {
227  if (!stopped) {
228  c.stopped(hmbdc::UnknownException());
229  return !c.dropped();
230  }
231  }
232  return true;
233 }
234 
235 inline
236 void unblock(MonoLockFreeBuffer& buffer, uint16_t){
237  buffer.reset();
238 }
239 
240 template <typename Buffer>
241 void unblock(Buffer& lfb, uint16_t threadSerialNumber) {
242  lfb.markDead(threadSerialNumber);
243 }
244 
245 }}}
Definition: MonoLockFreeBuffer.hpp:15
Definition: ContextDetail.hpp:148
Definition: Timers.hpp:66
the default vanilla allocate
Definition: Allocators.hpp:145
Unknown excpetion.
Definition: Exception.hpp:17
similar to ShmBasePtrAllocator but using dev memory
Definition: Allocators.hpp:117
Definition: ContextDetail.hpp:27
Definition: LockFreeBufferT.hpp:18
Definition: PoolConsumer.hpp:13
Exception that just has an exit code.
Definition: Exception.hpp:28
helping allocating object and its aggregated objects in a continouse shared memory ...
Definition: Allocators.hpp:88
Definition: LockFreeBufferMisc.hpp:89