hmbdc
simplify-high-performance-messaging-programming
ContextDetail.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/app/Client.hpp"
5 #include "hmbdc/app/Message.hpp"
6 
7 #include "hmbdc/pattern/PoolT.hpp"
8 #include "hmbdc/pattern/PoolMinus.hpp"
9 #include "hmbdc/pattern/LockFreeBufferT.hpp"
10 #include "hmbdc/pattern/MonoLockFreeBuffer.hpp"
11 
12 #include "hmbdc/os/Thread.hpp"
13 #include "hmbdc/os/Allocators.hpp"
14 #include "hmbdc/time/Timers.hpp"
15 
16 #include <vector>
17 #include <type_traits>
18 #include <thread>
19 #include <utility>
20 
21 namespace hmbdc { namespace app { namespace context_detail {
22 using namespace std;
23 using namespace hmbdc::pattern;
24 
25 template <typename CcClient>
28  template <typename U = CcClient>
29  PoolConsumerProxy(U& client
30  , size_t* pDispStartCount
31  , typename std::enable_if<std::is_base_of<time::TimerManager, U>::value>::type* = nullptr)
32  : pattern::PoolConsumer(CcClient::INTERESTS_SIZE != 0, &client, pDispStartCount)
33  , client_(client){}
34 
35  template <typename U = CcClient>
36  PoolConsumerProxy(U& client
37  , size_t* pDispStartCount
38  , typename std::enable_if<!std::is_base_of<time::TimerManager, U>::value>::type* = nullptr)
39  : pattern::PoolConsumer(CcClient::INTERESTS_SIZE != 0, nullptr, pDispStartCount)
40  , client_(client){}
41 
42  virtual size_t handleRangeImpl(BufIt it,
43  BufIt end, uint16_t threadSerialNumber) override {
44  return client_.CcClient::handleRangeImpl(it, end, threadSerialNumber);
45  }
46  virtual void messageDispatchingStartedCb(size_t const* dispStartCount) override {
47  client_.CcClient::messageDispatchingStartedCb(dispStartCount);
48  }
49  virtual void invokedCb(size_t n) override {
50  client_.CcClient::invokedCb(n);
51  }
52  virtual void stoppedCb(std::exception const&e) override {
53  client_.CcClient::stoppedCb(e);
54  }
55  virtual bool droppedCb() override {
56  if (client_.CcClient::droppedCb()) {
57  delete this;
58  return true;
59  } else {
60  return false;
61  }
62  }
63 private:
64  CcClient& HMBDC_RESTRICT client_;
65 };
66 
67 template <typename... ContextProperties>
71  enum {
72  broadcast_msg = 1,
73  has_pool = 1,
74  pool_msgless = 0,
75  ipc = 0,
76  dispatch_reverse = 0,
77  };
78 };
79 
80 template <uint16_t c, typename... ContextProperties>
81 struct context_property_aggregator<context_property::broadcast<c>
82  , ContextProperties...>
83 : context_property_aggregator<ContextProperties...> {
85  static_assert(context_property_aggregator<ContextProperties...>::broadcast_msg, "contradicting properties");
86  enum {
87  broadcast_msg = 1,
88  has_pool = 1,
89  };
90 };
91 
92 template <typename... ContextProperties>
93 struct context_property_aggregator<context_property::partition
94  , ContextProperties...>
95 : context_property_aggregator<ContextProperties...> {
97  enum {
98  broadcast_msg = 0,
99  has_pool = context_property_aggregator<ContextProperties...>::pool_msgless,
100  };
101 };
102 
103 template <typename... ContextProperties>
104 struct context_property_aggregator<context_property::msgless_pool
105  , ContextProperties...>
106 : context_property_aggregator<ContextProperties...> {
107  enum {
108  has_pool = 1,
109  pool_msgless = 1
110  };
111 };
112 
113 template <typename... ContextProperties>
114 struct context_property_aggregator<context_property::ipc_enabled
115  , ContextProperties...>
116 : context_property_aggregator<ContextProperties...> {
118  enum {
119  ipc = 1,
120  };
121 };
122 
123 template <typename... ContextProperties>
124 struct context_property_aggregator<context_property::pci_ipc
125  , ContextProperties...>
126 : context_property_aggregator<ContextProperties...> {
128  enum {
129  ipc = 1,
130  };
131 };
132 
133 using namespace std;
134 using namespace hmbdc::time;
135 
136 template <bool is_timer_manager>
137 struct tm_runner {
138  template<typename C>
139  void operator()(C&) {}
140 };
141 
142 template <>
143 struct tm_runner<true> {
144  void operator()(TimerManager& tm) {
145  tm.checkTimers(SysTime::now());
146  }
147 };
148 
149 template <typename LFB, typename CcClient>
150 bool runOnceImpl(uint16_t hmbdcNumber, bool& HMBDC_RESTRICT stopped
151  , LFB& HMBDC_RESTRICT lfb, CcClient& HMBDC_RESTRICT c) {
152  typename LFB::iterator begin, end;
153  try {
155  tr(c);
156 
157  using Cc = typename std::decay<CcClient>::type;
158 
159  const bool clientParticipateInMessaging = Cc::INTERESTS_SIZE != 0;
160  if (clientParticipateInMessaging) {
161  uint64_t count = lfb.peek(hmbdcNumber, begin, end, c.maxBatchMessageCount());
162  c.Cc::invokedCb(c.Cc::handleRangeImpl(begin, end, hmbdcNumber));
163  lfb.wasteAfterPeek(hmbdcNumber, count);
164  } else {
165  c.Cc::invokedCb(0);
166  }
167  } catch (std::exception const& e) {
168  if (!stopped) {
169  c.stopped(e);
170  return !c.dropped();
171  }
172  } catch (int code) {
173  if (!stopped) {
174  c.stopped(hmbdc::ExitCode(code));
175  return !c.dropped();
176  }
177  } catch (...) {
178  if (!stopped) {
179  c.stopped(hmbdc::UnknownException());
180  return !c.dropped();
181  }
182  }
183  return !stopped;
184 }
185 
186 template <typename CcClient>
187 bool runOnceImpl(uint16_t threadSerialNumber, bool& HMBDC_RESTRICT stopped
188  , hmbdc::pattern::MonoLockFreeBuffer& HMBDC_RESTRICT lfb, CcClient& HMBDC_RESTRICT c) {
190  try {
191  tm_runner<std::is_base_of<TimerManager, CcClient>::value> tr;
192  tr(c);
193 
194  using Cc = typename std::decay<CcClient>::type;
195  const bool clientParticipateInMessaging = Cc::INTERESTS_SIZE != 0;
196  if (clientParticipateInMessaging) {
197  uint64_t count = lfb.peek(begin, end, c.maxBatchMessageCount());
198  c.Cc::invokedCb(c.Cc::handleRangeImpl(begin, end, threadSerialNumber));
199  lfb.wasteAfterPeek(begin, count);
200  } else {
201  c.Cc::invokedCb(0);
202  }
203  } catch (std::exception const& e) {
204  if (!stopped) {
205  c.stopped(e);
206  return !c.dropped();
207  }
208  } catch (int code) {
209  if (!stopped) {
210  c.stopped(hmbdc::ExitCode(code));
211  return !c.dropped();
212  }
213  } catch (...) {
214  if (!stopped) {
215  c.stopped(hmbdc::UnknownException());
216  return !c.dropped();
217  }
218  }
219  return !stopped;
220 }
221 
222 inline
223 void unblock(MonoLockFreeBuffer& buffer, uint16_t){
224  buffer.reset();
225 }
226 
227 template <typename Buffer>
228 void unblock(Buffer& lfb, uint16_t threadSerialNumber) {
229  lfb.markDead(threadSerialNumber);
230 }
231 
232 }}}
Definition: MonoLockFreeBuffer.hpp:16
Definition: ContextDetail.hpp:137
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
the default vanilla allocate
Definition: Allocators.hpp:153
Unknown excpetion.
Definition: Exception.hpp:17
similar to ShmBasePtrAllocator but using dev memory
Definition: Allocators.hpp:125
Definition: BlockingBuffer.hpp:11
Definition: ContextDetail.hpp:26
Definition: LockFreeBufferT.hpp:18
Definition: PoolConsumer.hpp:16
Definition: Rater.hpp:10
Exception that just has an exit code.
Definition: Exception.hpp:28
helping allocating object and its aggregated objects in a continouse shared memory ...
Definition: Allocators.hpp:95
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89