hmbdc
simplify-high-performance-messaging-programming
PoolConsumer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/pattern/LockFreeBufferMisc.hpp"
5 #include "hmbdc/time/Timers.hpp"
6 #include "hmbdc/Exception.hpp"
7 #include "hmbdc/Config.hpp"
8 #include <boost/smart_ptr/detail/yield_k.hpp>
9 #include <limits>
10 #include <iostream>
11 
12 #include <stdexcept>
13 
14 namespace hmbdc { namespace pattern {
15 
16 struct PoolConsumer {
17  friend struct PoolImpl;
18 
19  template <typename Buffer>
20  friend struct PoolTImpl;
21  friend struct PoolMinusImpl;
22  PoolConsumer(bool interestedInMessages, time::TimerManager* tmPtr, size_t* pDispStartCount);
23  virtual ~PoolConsumer();
24  void stopped(std::exception const&) noexcept;
25  bool dropped() noexcept;
26  void messageDispatchingStarted() {
27  if (hmbdc_unlikely(!messageDispatchingStarted_)) {
28  messageDispatchingStartedCb(pDispStartCount_
29  ?(__atomic_add_fetch(pDispStartCount_, 1, __ATOMIC_RELEASE), pDispStartCount_)
30  :0);
31  messageDispatchingStarted_ = true;
32  }
33  }
34  void invoked(size_t);
35 
36 protected:
38 
39 private:
40  void reset();
41  bool handleRange(BufIt begin,
42  BufIt end, uint16_t threadId) noexcept;
43  bool handleInvokeOnly(uint16_t threadId) noexcept;
44  virtual void messageDispatchingStartedCb(size_t const*) {}
45  virtual size_t handleRangeImpl(BufIt begin,
46  BufIt end, uint16_t threadId){ return 0; };
47  virtual void invokedCb(size_t) {}
48  virtual void stoppedCb(std::exception const&) {}
49  virtual bool droppedCb() { return true; }
50 
52  uint64_t poolThreadAffinity;
53  uint16_t droppedCount;
54  uint64_t skippedPoolThreadMask_;
55  HMBDC_SEQ_TYPE nextSeq_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
56  bool interestedInMessages;
57  bool messageDispatchingStarted_;
58  size_t* pDispStartCount_ = nullptr;
59 };
60 
61 }} // end namespace hmbdc::pattern
62 
63 
64 namespace hmbdc { namespace pattern {
65 
66 using namespace boost;
67 
68 inline
69 PoolConsumer::
70 PoolConsumer(bool interestedInMessages, time::TimerManager* tmPtr, size_t* pDispStartCount)
71 : tmPtr(tmPtr)
72 , poolThreadAffinity(0xfffffffffffffffful)
73 , droppedCount(0u)
74 , nextSeq_(std::numeric_limits<typename BufIt::Sequence>::max())
75 , interestedInMessages(interestedInMessages)
76 , messageDispatchingStarted_(false)
77 , pDispStartCount_(pDispStartCount) {
78 }
79 
80 inline
81 void
82 PoolConsumer::
83 reset() {
84  poolThreadAffinity = 0xfffffffffffffffful;
85  droppedCount = 0u;
86  nextSeq_ = std::numeric_limits<uint64_t>::max();
87 }
88 
89 inline
90 PoolConsumer::
91 ~PoolConsumer(){
92 }
93 
94 inline
95 void
96 PoolConsumer::
97 invoked(size_t n) {
98  if (tmPtr) {
99  tmPtr->checkTimers(time::SysTime::now());
100  }
101  invokedCb(n);
102 }
103 
104 inline
105 bool
106 PoolConsumer::
107 handleRange(BufIt begin,
108  BufIt end, uint16_t threadId) HMBDC_RESTRICT noexcept {
109  const auto STOP = std::numeric_limits<typename BufIt::Sequence>::max() - 1;
110  const auto IGNORE = std::numeric_limits<typename BufIt::Sequence>::max() - 2;
111  bool res = true;
112  if (hmbdc_unlikely(nextSeq_ == std::numeric_limits<typename BufIt::Sequence>::max()
113  && __sync_bool_compare_and_swap(&nextSeq_
114  , std::numeric_limits<typename BufIt::Sequence>::max(), begin.seq()))) {
115  try {
116  messageDispatchingStarted();
117  }
118  catch (std::exception const& e) {
119  nextSeq_ = STOP;
120  stopped(e);
121  res = false;
122  } catch (int c) {
123  nextSeq_ = STOP;
124  stopped(hmbdc::ExitCode(c));
125  res = false;
126  } catch (...) {
127  nextSeq_ = STOP;
128  stopped(hmbdc::UnknownException());
129  res = false;
130  }
131  }
132  auto nextSeq = nextSeq_;
133  if (hmbdc_unlikely(nextSeq == STOP)) return false;
134  else if (hmbdc_unlikely(nextSeq == IGNORE)) return true;
135  if ((hmbdc_unlikely(interestedInMessages && begin.seq() > nextSeq))) {
136  return true;
137  } else if (hmbdc_likely(end.seq() > nextSeq)) {
138  if (hmbdc_likely(__sync_bool_compare_and_swap(&nextSeq_, nextSeq, IGNORE))) {
139  try {
140  size_t res = 0;
141  if (hmbdc_likely(interestedInMessages)) {
142  PoolConsumer::BufIt myStart = begin + (size_t)(nextSeq - begin.seq());
143  res = handleRangeImpl(myStart, end, threadId);
144  }
145  invoked(res);
146  } catch (std::exception const& e) {
147  nextSeq_ = STOP;
148  stopped(e);
149  res = false;
150  } catch (int c) {
151  nextSeq_ = STOP;
152  stopped(hmbdc::ExitCode(c));
153  res = false;
154  } catch (...) {
155  nextSeq_ = STOP;
156  stopped(hmbdc::UnknownException());
157  res = false;
158  }
159  __sync_synchronize();
160  if (hmbdc_likely(res)) nextSeq_ = end.seq();
161  }
162  } else if (hmbdc_unlikely(!interestedInMessages
163  || (end.seq() == nextSeq && begin.seq() == nextSeq))) {
164  if (hmbdc_unlikely(__sync_bool_compare_and_swap(&nextSeq_, nextSeq, IGNORE))) {
165  try {
166  invoked(0);
167  } catch (std::exception const& e) {
168  nextSeq_ = STOP;
169  stopped(e);
170  res = false;
171  } catch (int c) {
172  nextSeq_ = STOP;
173  stopped(hmbdc::ExitCode(c));
174  res = false;
175  } catch (...) {
176  nextSeq_ = STOP;
177  stopped(hmbdc::UnknownException());
178  res = false;
179  }
180  __sync_synchronize();
181  if (hmbdc_likely(res)) nextSeq_ = end.seq();
182  }
183  }
184 
185  return res;
186 }
187 
188 inline
189 void
190 PoolConsumer::
191 stopped(std::exception const&e) noexcept {
192  try {
193  stoppedCb(e);
194  } catch (...) {}
195 }
196 
197 inline
198 bool
199 PoolConsumer::
200 dropped() noexcept {
201  bool res = true;
202  try {
203  res = droppedCb();
204  } catch (...) {}
205  return res;
206 }
207 
208 }} // end namespace hmbdc::pattern
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
Unknown excpetion.
Definition: Exception.hpp:17
Definition: PoolConsumer.hpp:16
Definition: PoolT.ipp:26
Exception that just has an exit code.
Definition: Exception.hpp:28
Definition: PoolMinusImpl.hpp:24
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89