1 #include "RingBuffer.hpp" 2 #include "hmbdc/numeric/BitMath.hpp" 3 #include "hmbdc/time/Timers.hpp" 4 #include "hmbdc/pattern/PoolMinus.hpp" 5 #include "hmbdc/pattern/LockFreeBufferT.hpp" 6 #include "hmbdc/Exception.hpp" 7 #include "hmbdc/Compile.hpp" 8 #include "hmbdc/os/Thread.hpp" 10 #include <boost/smart_ptr/detail/yield_k.hpp> 17 #pragma GCC diagnostic push 19 #pragma GCC diagnostic ignored "-Waddress-of-packed-member" 22 namespace hmbdc {
namespace pattern {
29 , uint64_t activePoolThreadSequenceMask = 0)
31 , activePoolThreadSequenceMask_(activePoolThreadSequenceMask)
33 , consumerQ_(hmbdc::numeric::log2Upper(maxConsumerSize * 2u)) {
36 uint32_t consumerSize()
const {
37 return consumerCount_;
40 void addConsumer(
PoolConsumer& c, uint64_t poolThreadAffinityIn) {
41 auto count = __sync_add_and_fetch(&consumerCount_, 1u);
42 if (count > consumerQ_.CAPACITY) {
43 __sync_sub_and_fetch(&consumerCount_, 1u);
44 HMBDC_THROW(std::runtime_error
45 ,
"too many consumers, poolsize = " << consumerCount_);
50 c.poolThreadAffinity = poolThreadAffinityIn;
54 void start(uint16_t threadCount, uint64_t cpuAffinityMask) {
55 if (threadCount > 64u) {
56 HMBDC_THROW(std::runtime_error,
57 "threadCount too large " << threadCount <<
" > 64");
59 if (activePoolThreadSequenceMask_) {
60 HMBDC_THROW(std::runtime_error,
"cannot restart a pool.");
62 activePoolThreadSequenceMask_ = (1ul << threadCount) - 1u;
63 for (
auto i = 0u; i < threadCount; ++i) {
65 std::thread([
this, i, cpuAffinityMask]() {
66 threadEntry(i, cpuAffinityMask);
78 for (Threads::value_type& t : threads_) t.join();
84 template <
typename HoggingCond>
85 void runOnce(uint16_t threadSerialNumber
86 , HoggingCond&& hoggingCond) {
87 runOnceImpl(threadSerialNumber, std::forward<HoggingCond>(hoggingCond));
90 void cleanup(uint16_t threadSerialNumber) {
92 while (consumerCount_) {
94 if (consumerQ_.tryTake(consumer)) {
95 if (hmbdc_unlikely(!(consumer->poolThreadAffinity & (1u << threadSerialNumber)))) {
96 consumerQ_.put(consumer);
99 __sync_sub_and_fetch(&consumerCount_, 1u);
107 void threadEntry(uint16_t threadSerialNumber, uint64_t cpuAffinityMask) HMBDC_RESTRICT {
108 std::string name =
"hmbdcp" + std::to_string(threadSerialNumber);
109 if (cpuAffinityMask == 0) {
110 auto cpuCount = std::thread::hardware_concurrency();
111 cpuAffinityMask = (1ul << cpuCount) - 1u;
114 cpuAffinityMask = nthSetBitFromLsb(cpuAffinityMask
115 , threadSerialNumber % setBitsCount(cpuAffinityMask));
117 hmbdc::os::configureCurrentThread(name.c_str(), cpuAffinityMask);
120 runOnceImpl(threadSerialNumber, [](){
return true;});
122 cleanup(threadSerialNumber);
125 template <
typename HoggingCond>
126 void runOnceImpl(uint16_t threadSerialNumber, HoggingCond&& hoggingCond) HMBDC_RESTRICT {
127 if (hmbdc_unlikely(stopped_))
return;
129 if (hmbdc_unlikely(!consumerQ_.tryTake(consumer)))
return;
130 if (hmbdc_unlikely(!(consumer->poolThreadAffinity & (1u << threadSerialNumber)))) {
131 consumerQ_.put(consumer);
135 if (hmbdc_unlikely(!consumer->messageDispatchingStarted_)) {
136 consumer->messageDispatchingStarted();
139 consumer->invoked(threadSerialNumber);
140 }
while (hoggingCond() && consumerQ_.remainingSize() == 0 && !stopped_);
141 }
catch (std::exception
const& e) {
142 consumer->stopped(e);
143 __sync_sub_and_fetch(&consumerCount_, 1u);
144 if (!consumer->dropped()) {
145 addConsumer(*consumer
146 , consumer->poolThreadAffinity & activePoolThreadSequenceMask_);
150 consumerQ_.put(consumer);
154 using Threads = std::vector<std::thread>;
157 uint64_t activePoolThreadSequenceMask_;
158 uint32_t consumerCount_;
163 #pragma GCC diagnostic pop 165 #include "hmbdc/os/Allocators.hpp" 167 namespace hmbdc {
namespace pattern {
171 PoolMinus(uint32_t maxConsumerSize)
172 : impl_(os::DefaultAllocator::instance().allocate<PoolMinusImpl>(SMP_CACHE_BYTES, maxConsumerSize)) {
178 os::DefaultAllocator::instance().unallocate(static_cast<PoolMinusImpl*>(impl_));
181 inline void PoolMinus::addConsumer(PoolConsumer& c, uint64_t poolThreadAffinityIn) {
static_cast<PoolMinusImpl*
>(impl_)->addConsumer(c, poolThreadAffinityIn);}
182 inline uint32_t PoolMinus::consumerSize()
const {
return static_cast<PoolMinusImpl*
>(impl_)->consumerSize();}
183 inline void PoolMinus::start(uint16_t threadCount, uint64_t cpuAffinityMask,
bool) {
static_cast<PoolMinusImpl*
>(impl_)->start(threadCount, cpuAffinityMask);}
184 inline void PoolMinus::runOnce(uint16_t threadSerialNumber) HMBDC_RESTRICT {
static_cast<PoolMinusImpl*
>(impl_)->runOnce(threadSerialNumber, [](){
return true;});}
185 inline void PoolMinus::stop() {
static_cast<PoolMinusImpl*
>(impl_)->stop();}
186 inline void PoolMinus::join() {
static_cast<PoolMinusImpl*
>(impl_)->join();}
Definition: BitMath.hpp:6
Definition: PoolConsumer.hpp:16
Definition: PoolMinusImpl.hpp:24