hmbdc
simplify-high-performance-messaging-programming
PoolMinusImpl.hpp
1 #include "RingBuffer.hpp"
2 #include "hmbdc/numeric/BitMath.hpp"
3 #include "hmbdc/time/Timers.hpp"
4 #include "hmbdc/pattern/PoolMinus.hpp"
5 #include "hmbdc/pattern/LockFreeBufferT.hpp"
6 #include "hmbdc/Exception.hpp"
7 #include "hmbdc/Compile.hpp"
8 #include "hmbdc/os/Thread.hpp"
9 
10 #include <boost/smart_ptr/detail/yield_k.hpp>
11 #include <utility>
12 #include <functional>
13 #include <vector>
14 #include <thread>
15 #include <string>
16 
17 #pragma GCC diagnostic push
18 #ifdef __clang__
19 #pragma GCC diagnostic ignored "-Waddress-of-packed-member"
20 #endif
21 
22 namespace hmbdc { namespace pattern {
23 
24 struct PoolMinusImpl {
25  PoolMinusImpl(PoolMinusImpl const&) = delete;
26  PoolMinusImpl& operator = (PoolMinusImpl const&) = delete;
27 
28  PoolMinusImpl(uint16_t maxConsumerSize
29  , uint64_t activePoolThreadSequenceMask = 0) //if set other than 0, start cannot be called
30  : stopped_(false)
31  , activePoolThreadSequenceMask_(activePoolThreadSequenceMask)
32  , consumerCount_(0u)
33  , consumerQ_(hmbdc::numeric::log2Upper(maxConsumerSize * 2u)) {
34  }
35 
36  uint32_t consumerSize() const {
37  return consumerCount_;
38  }
39 
40  void addConsumer(PoolConsumer& c, uint64_t poolThreadAffinityIn) {
41  auto count = __sync_add_and_fetch(&consumerCount_, 1u);
42  if (count > consumerQ_.CAPACITY) {
43  __sync_sub_and_fetch(&consumerCount_, 1u);
44  HMBDC_THROW(std::runtime_error
45  , "too many consumers, poolsize = " << consumerCount_);
46  }
47 
48  auto ptr = static_cast<PoolConsumer*>(&c);
49  ptr->reset();
50  c.poolThreadAffinity = poolThreadAffinityIn;
51  consumerQ_.put(&c);
52  }
53 
54  void start(uint16_t threadCount, uint64_t cpuAffinityMask) {
55  if (threadCount > 64u) {
56  HMBDC_THROW(std::runtime_error,
57  "threadCount too large " << threadCount << " > 64");
58  }
59  if (activePoolThreadSequenceMask_) {
60  HMBDC_THROW(std::runtime_error, "cannot restart a pool.");
61  }
62  activePoolThreadSequenceMask_ = (1ul << threadCount) - 1u;
63  for (auto i = 0u; i < threadCount; ++i) {
64  threads_.push_back(
65  std::thread([this, i, cpuAffinityMask]() {
66  threadEntry(i, cpuAffinityMask);
67  })
68  );
69  }
70  }
71 
72  void stop() {
73  __sync_synchronize();
74  stopped_ = true;
75  }
76 
77  void join() {
78  for (Threads::value_type& t : threads_) t.join();
79  threads_.clear();
80  }
81 
83 
84  template <typename HoggingCond>
85  void runOnce(uint16_t threadSerialNumber
86  , HoggingCond&& hoggingCond) {
87  runOnceImpl(threadSerialNumber, std::forward<HoggingCond>(hoggingCond));
88  }
89 
90  void cleanup(uint16_t threadSerialNumber) {
91  if (stopped_) {
92  while (consumerCount_) {
93  PoolConsumer* consumer;
94  if (consumerQ_.tryTake(consumer)) {
95  if (hmbdc_unlikely(!(consumer->poolThreadAffinity & (1u << threadSerialNumber)))) {
96  consumerQ_.put(consumer);
97  continue;
98  }
99  __sync_sub_and_fetch(&consumerCount_, 1u);
100  consumer->dropped();
101  }
102  }
103  }
104  }
105 
106 private:
107  void threadEntry(uint16_t threadSerialNumber, uint64_t cpuAffinityMask) HMBDC_RESTRICT {
108  std::string name = "hmbdcp" + std::to_string(threadSerialNumber);
109  if (cpuAffinityMask == 0) {
110  auto cpuCount = std::thread::hardware_concurrency();
111  cpuAffinityMask = (1ul << cpuCount) - 1u;
112  }
113  using namespace hmbdc::numeric;
114  cpuAffinityMask = nthSetBitFromLsb(cpuAffinityMask
115  , threadSerialNumber % setBitsCount(cpuAffinityMask));
116 
117  hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask);
118 
119  while(!stopped_){
120  runOnceImpl(threadSerialNumber, [](){return true;});
121  }
122  cleanup(threadSerialNumber);
123  }
124 
125  template <typename HoggingCond>
126  void runOnceImpl(uint16_t threadSerialNumber, HoggingCond&& hoggingCond) HMBDC_RESTRICT {
127  if (hmbdc_unlikely(stopped_)) return;
128  PoolConsumer* consumer;
129  if (hmbdc_unlikely(!consumerQ_.tryTake(consumer))) return;
130  if (hmbdc_unlikely(!(consumer->poolThreadAffinity & (1u << threadSerialNumber)))) {
131  consumerQ_.put(consumer);
132  return;
133  }
134  try {
135  if (hmbdc_unlikely(!consumer->messageDispatchingStarted_)) {
136  consumer->messageDispatchingStarted();
137  }
138  do {
139  consumer->invoked(threadSerialNumber);
140  } while (hoggingCond() && consumerQ_.remainingSize() == 0 && !stopped_);
141  } catch (std::exception const& e) {
142  consumer->stopped(e);
143  __sync_sub_and_fetch(&consumerCount_, 1u);
144  if (!consumer->dropped()) {//resume
145  addConsumer(*consumer
146  , consumer->poolThreadAffinity & activePoolThreadSequenceMask_);
147  }
148  return;
149  }
150  consumerQ_.put(consumer);
151  }
152 
153 
154  using Threads = std::vector<std::thread>;
155  Threads threads_;
156  bool stopped_;
157  uint64_t activePoolThreadSequenceMask_;
158  uint32_t consumerCount_;
159  ConsumerQ consumerQ_;
160 };
161 }}
162 
163 #pragma GCC diagnostic pop
164 
165 #include "hmbdc/os/Allocators.hpp"
166 
167 namespace hmbdc { namespace pattern {
168 
169 inline
170 PoolMinus::
171 PoolMinus(uint32_t maxConsumerSize)
172 : impl_(os::DefaultAllocator::instance().allocate<PoolMinusImpl>(SMP_CACHE_BYTES, maxConsumerSize)) {
173 }
174 
175 inline
176 PoolMinus::
177 ~PoolMinus() {
178  os::DefaultAllocator::instance().unallocate(static_cast<PoolMinusImpl*>(impl_));
179 }
180 
181 inline void PoolMinus::addConsumer(PoolConsumer& c, uint64_t poolThreadAffinityIn) { static_cast<PoolMinusImpl*>(impl_)->addConsumer(c, poolThreadAffinityIn);}
182 inline uint32_t PoolMinus::consumerSize() const {return static_cast<PoolMinusImpl*>(impl_)->consumerSize();}
183 inline void PoolMinus::start(uint16_t threadCount, uint64_t cpuAffinityMask, bool) { static_cast<PoolMinusImpl*>(impl_)->start(threadCount, cpuAffinityMask);}
184 inline void PoolMinus::runOnce(uint16_t threadSerialNumber) HMBDC_RESTRICT { static_cast<PoolMinusImpl*>(impl_)->runOnce(threadSerialNumber, [](){return true;});}
185 inline void PoolMinus::stop() { static_cast<PoolMinusImpl*>(impl_)->stop();}
186 inline void PoolMinus::join() { static_cast<PoolMinusImpl*>(impl_)->join();}
187 }}
Definition: BitMath.hpp:6
Definition: PoolConsumer.hpp:16
Definition: PoolMinusImpl.hpp:24
Definition: Base.hpp:12