1 #include "hmbdc/Copyright.hpp" 4 #include "hmbdc/pattern/LockFreeBufferMisc.hpp" 5 #include "hmbdc/time/Timers.hpp" 6 #include "hmbdc/Exception.hpp" 7 #include "hmbdc/Config.hpp" 8 #include <boost/smart_ptr/detail/yield_k.hpp> 14 namespace hmbdc {
namespace pattern {
17 friend struct PoolImpl;
19 template <
typename Buffer>
24 void stopped(std::exception
const&) noexcept;
25 bool dropped() noexcept;
26 void messageDispatchingStarted() {
27 if (hmbdc_unlikely(!messageDispatchingStarted_)) {
28 messageDispatchingStartedCb(pDispStartCount_
29 ?(__atomic_add_fetch(pDispStartCount_, 1, __ATOMIC_RELEASE), pDispStartCount_)
31 messageDispatchingStarted_ =
true;
41 bool handleRange(
BufIt begin,
42 BufIt end, uint16_t threadId) noexcept;
43 bool handleInvokeOnly(uint16_t threadId) noexcept;
44 virtual void messageDispatchingStartedCb(
size_t const*) {}
45 virtual size_t handleRangeImpl(
BufIt begin,
46 BufIt end, uint16_t threadId){
return 0; };
47 virtual void invokedCb(
size_t) {}
48 virtual void stoppedCb(std::exception
const&) {}
49 virtual bool droppedCb() {
return true; }
52 uint64_t poolThreadAffinity;
53 uint16_t droppedCount;
54 uint64_t skippedPoolThreadMask_;
55 HMBDC_SEQ_TYPE nextSeq_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
56 bool interestedInMessages;
57 bool messageDispatchingStarted_;
58 size_t* pDispStartCount_ =
nullptr;
64 namespace hmbdc {
namespace pattern {
66 using namespace boost;
70 PoolConsumer(
bool interestedInMessages, time::TimerManager* tmPtr,
size_t* pDispStartCount)
72 , poolThreadAffinity(0xfffffffffffffffful)
74 , nextSeq_(
std::numeric_limits<typename BufIt::Sequence>::max())
75 , interestedInMessages(interestedInMessages)
76 , messageDispatchingStarted_(false)
77 , pDispStartCount_(pDispStartCount) {
84 poolThreadAffinity = 0xfffffffffffffffful;
86 nextSeq_ = std::numeric_limits<uint64_t>::max();
99 tmPtr->checkTimers(time::SysTime::now());
107 handleRange(BufIt begin,
108 BufIt end, uint16_t threadId) HMBDC_RESTRICT noexcept {
109 const auto STOP = std::numeric_limits<typename BufIt::Sequence>::max() - 1;
110 const auto IGNORE = std::numeric_limits<typename BufIt::Sequence>::max() - 2;
112 if (hmbdc_unlikely(nextSeq_ == std::numeric_limits<typename BufIt::Sequence>::max()
113 && __sync_bool_compare_and_swap(&nextSeq_
114 , std::numeric_limits<typename BufIt::Sequence>::max(), begin.seq()))) {
116 messageDispatchingStarted();
118 catch (std::exception
const& e) {
132 auto nextSeq = nextSeq_;
133 if (hmbdc_unlikely(nextSeq == STOP))
return false;
134 else if (hmbdc_unlikely(nextSeq == IGNORE))
return true;
135 if ((hmbdc_unlikely(interestedInMessages && begin.seq() > nextSeq))) {
137 }
else if (hmbdc_likely(end.seq() > nextSeq)) {
138 if (hmbdc_likely(__sync_bool_compare_and_swap(&nextSeq_, nextSeq, IGNORE))) {
141 if (hmbdc_likely(interestedInMessages)) {
142 PoolConsumer::BufIt myStart = begin + (size_t)(nextSeq - begin.seq());
143 res = handleRangeImpl(myStart, end, threadId);
146 }
catch (std::exception
const& e) {
159 __sync_synchronize();
160 if (hmbdc_likely(res)) nextSeq_ = end.seq();
162 }
else if (hmbdc_unlikely(!interestedInMessages
163 || (end.seq() == nextSeq && begin.seq() == nextSeq))) {
164 if (hmbdc_unlikely(__sync_bool_compare_and_swap(&nextSeq_, nextSeq, IGNORE))) {
167 }
catch (std::exception
const& e) {
180 __sync_synchronize();
181 if (hmbdc_likely(res)) nextSeq_ = end.seq();
191 stopped(std::exception
const&e) noexcept {
Definition: TypedString.hpp:84
Definition: Timers.hpp:70
Unknown excpetion.
Definition: Exception.hpp:17
Definition: PoolConsumer.hpp:16
Exception that just has an exit code.
Definition: Exception.hpp:28
Definition: PoolMinusImpl.hpp:24
Definition: LockFreeBufferMisc.hpp:89