1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/pattern/LockFreeBufferMisc.hpp" 4 #include "hmbdc/Exception.hpp" 5 #include "hmbdc/Compile.hpp" 7 #include <boost/smart_ptr/detail/yield_k.hpp> 17 #define HMBDC_YIELD(x) boost::detail::yield(x) 20 namespace hmbdc {
namespace pattern {
22 namespace memringbuffer_detail {
26 : std::atomic_flag{0} {
28 constexpr
int f[] = ATOMIC_FLAG_INIT;
29 static_assert(0 == f[0],
"");
32 while (test_and_set(std::memory_order_acquire)) {
33 std::this_thread::yield();
38 clear(std::memory_order_release);
42 template<u
int16_t parallel_consumer_count,
typename SeqT =
size_t>
45 using Sequence = SeqT;
46 enum {PARALLEL_CONSUMER_COUNT = parallel_consumer_count,};
48 const size_t CAPACITY;
49 const size_t VALUE_TYPE_SIZE;
52 const Sequence READ_SEQ_MAX;
56 Sequence toBeClaimedSeq_
57 __attribute__((__aligned__(SMP_CACHE_BYTES)));
58 Sequence readSeq_[PARALLEL_CONSUMER_COUNT]
59 __attribute__((__aligned__(SMP_CACHE_BYTES)));
60 Sequence readSeqLastPurge_[PARALLEL_CONSUMER_COUNT]
61 __attribute__((__aligned__(SMP_CACHE_BYTES)));
65 inline __attribute__ ((always_inline))
66 Sequence readSeqLow()
const HMBDC_RESTRICT {
67 Sequence res = readSeq_[0];
69 i < PARALLEL_CONSUMER_COUNT; ++i)
70 if (res > readSeq_[i]) res = readSeq_[i];
74 inline __attribute__ ((always_inline))
75 uint16_t findSlowestReader()
const HMBDC_RESTRICT {
76 Sequence smallest = readSeq_[0];
77 uint16_t smallestLoc = 0;
78 for (uint16_t i = 1; i < PARALLEL_CONSUMER_COUNT; ++i)
79 if (smallest > readSeq_[i]) {
80 smallest = readSeq_[i];
87 using value_type =
void *;
92 static size_t footprint(uint32_t valueTypeSizePower2Num, uint32_t ringSizePower2Num) {
94 valueTypeSizePower2Num, ringSizePower2Num) + SMP_CACHE_BYTES;
97 template <
typename Allocator = os::DefaultAllocator>
98 MemRingBuffer(uint32_t valueTypeSizePower2Num, uint32_t ringSizePower2Num
99 , Allocator& allocator = os::DefaultAllocator::instance())
100 : CAPACITY(1u << ringSizePower2Num)
101 , VALUE_TYPE_SIZE((1u << valueTypeSizePower2Num) -
sizeof(Sequence))
102 , READ_SEQ_MAX(numeric_limits<Sequence>::max() - CAPACITY - 1000u)
105 , buffer_(valueTypeSizePower2Num, ringSizePower2Num
107 , toBeClaimedSeq_(0u) {
108 fill_n(readSeq_, (
int)PARALLEL_CONSUMER_COUNT, 0);
109 fill_n(readSeqLastPurge_, (
int)PARALLEL_CONSUMER_COUNT, READ_SEQ_MAX);
110 for (
auto i = CAPACITY; i != 0 ; --i) {
111 *buffer_.getSeq(i - 1) = numeric_limits<Sequence>::max();
115 void put(
void const* HMBDC_RESTRICT item,
size_t sizeHint = 0) HMBDC_RESTRICT {
117 Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
119 seq >= CAPACITY + readSeqLow();
123 size_t index = seq & MASK;
124 memcpy(buffer_ + index
125 , item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
127 __atomic_thread_fence(__ATOMIC_ACQUIRE);
128 *buffer_.getSeq(index) = seq;
131 bool tryPut(
void const* HMBDC_RESTRICT item,
size_t sizeHint = 0) HMBDC_RESTRICT {
133 __atomic_thread_fence(__ATOMIC_ACQUIRE);
134 for(
auto seq = toBeClaimedSeq_;
135 seq < CAPACITY + readSeqLow();
136 seq = toBeClaimedSeq_) {
138 if (hmbdc_likely(__atomic_compare_exchange_n (
139 &toBeClaimedSeq_, &seq, seq + 1,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
140 size_t index = seq & MASK;
141 memcpy(buffer_ + index
142 , item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
144 __atomic_thread_fence(__ATOMIC_ACQUIRE);
145 *buffer_.getSeq(index) = seq;
152 void killPut(
void const* HMBDC_RESTRICT item,
size_t sizeHint = 0) HMBDC_RESTRICT {
154 Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
155 while (seq >= CAPACITY + readSeqLow()) {
156 uint16_t slowLoc = findSlowestReader();
160 size_t index = seq & MASK;
161 memcpy(buffer_ + index, item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
162 __atomic_thread_fence(__ATOMIC_ACQUIRE);
163 *buffer_.getSeq(index) = seq;
166 bool isFull()
const {
167 return toBeClaimedSeq_ >= CAPACITY + readSeqLow();
171 Sequence readSeq(uint16_t PARALLEL_CONSUMER_INDEX)
const HMBDC_RESTRICT {
172 return readSeq_[PARALLEL_CONSUMER_INDEX];
177 Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
179 seq >= CAPACITY + readSeqLow();
186 iterator tryClaim() HMBDC_RESTRICT {
188 __atomic_thread_fence(__ATOMIC_ACQUIRE);
189 for(
auto seq = toBeClaimedSeq_;
190 seq < CAPACITY + readSeqLow();
191 seq = toBeClaimedSeq_) {
193 if (hmbdc_likely(__atomic_compare_exchange_n (
194 &toBeClaimedSeq_, &seq, seq + 1,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
211 __atomic_thread_fence(__ATOMIC_ACQUIRE);
212 for(
auto seq = toBeClaimedSeq_;
213 seq + n - 1 < CAPACITY + readSeqLow();
214 seq = toBeClaimedSeq_) {
216 if (hmbdc_likely(__atomic_compare_exchange_n (
217 &toBeClaimedSeq_, &seq, seq + n,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
224 iterator claim(
size_t n) HMBDC_RESTRICT {
226 Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, n, __ATOMIC_RELAXED);
228 seq + n > CAPACITY + readSeqLow();
235 iterator killClaim() HMBDC_RESTRICT {
237 Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
238 while (seq >= CAPACITY + readSeqLow()) {
239 uint16_t slowLoc = findSlowestReader();
242 return iterator(buffer_, seq);
245 iterator killClaim(
size_t n) HMBDC_RESTRICT {
247 Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, n, __ATOMIC_RELAXED);
248 while (seq + n > CAPACITY + readSeqLow()) {
249 uint16_t slowLoc = findSlowestReader();
252 return iterator(buffer_, seq);
255 void commit(iterator it) HMBDC_RESTRICT {
256 __sync_synchronize();
258 *buffer_.getSeq(*it - buffer_) = it.seq_;
261 void commit(iterator from,
size_t n) HMBDC_RESTRICT {
262 __sync_synchronize();
264 for (
size_t i = 0; i < n; ++i) {
265 *buffer_.getSeq(*from - buffer_) = from.seq_;
270 void markDead(uint16_t parallel_consumer_index) HMBDC_RESTRICT {
271 if (parallel_consumer_index < PARALLEL_CONSUMER_COUNT) {
272 readSeq_[parallel_consumer_index] = READ_SEQ_MAX;
273 __sync_synchronize();
279 unusedConsumerIndexes()
const {
280 vector<uint16_t> res;
281 for (uint16_t i = 0; i < PARALLEL_CONSUMER_COUNT; ++i) {
282 if (readSeq_[i] == READ_SEQ_MAX) {
289 void take(uint16_t PARALLEL_CONSUMER_INDEX,
void * HMBDC_RESTRICT item,
size_t sizeHint = 0) HMBDC_RESTRICT {
290 auto seq = readSeq_[PARALLEL_CONSUMER_INDEX];
291 if (hmbdc_unlikely(seq >= READ_SEQ_MAX)) {
292 HMBDC_THROW(DeadConsumer, PARALLEL_CONSUMER_INDEX);
294 size_t index = seq & MASK;
296 seq != *buffer_.getSeq(index);
301 memcpy(item, buffer_ + index, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
303 __atomic_fetch_add(readSeq_ + PARALLEL_CONSUMER_INDEX, 1, __ATOMIC_RELEASE);
306 void takeReentrant(uint16_t PARALLEL_CONSUMER_INDEX,
void * HMBDC_RESTRICT item,
size_t sizeHint = 0) HMBDC_RESTRICT {
307 lock_guard<my_spin_lock> guard(locks_[PARALLEL_CONSUMER_INDEX]);
308 take(PARALLEL_CONSUMER_INDEX, item, sizeHint);
311 iterator peek(uint16_t PARALLEL_CONSUMER_INDEX)
const HMBDC_RESTRICT {
312 auto readSeq = readSeq_[PARALLEL_CONSUMER_INDEX];
313 if (hmbdc_unlikely(readSeq >= READ_SEQ_MAX)) {
314 HMBDC_THROW(DeadConsumer, PARALLEL_CONSUMER_INDEX);
316 if (readSeq == *buffer_.getSeq(readSeq & MASK)) {
317 return iterator(buffer_, readSeq);
322 size_t peek(uint16_t PARALLEL_CONSUMER_INDEX, iterator& begin, iterator& end
323 ,
size_t maxPeekSize = numeric_limits<size_t>::max())
const {
324 Sequence readSeq = readSeq_[PARALLEL_CONSUMER_INDEX];
325 if (hmbdc_unlikely(readSeq >= READ_SEQ_MAX)) {
326 HMBDC_THROW(DeadConsumer, PARALLEL_CONSUMER_INDEX);
328 begin = iterator(buffer_, readSeq);
329 while (readSeq == *buffer_.getSeq(readSeq & MASK)
333 end = iterator(buffer_, readSeq);
334 return readSeq - readSeq_[PARALLEL_CONSUMER_INDEX];
337 size_t peekSome(uint16_t PARALLEL_CONSUMER_INDEX, iterator& begin, iterator& end
338 ,
size_t maxPeekSize = numeric_limits<size_t>::max())
const {
341 !(res = peek(PARALLEL_CONSUMER_INDEX, begin, end, maxPeekSize))
349 void waste(uint16_t PARALLEL_CONSUMER_INDEX,
size_t size) HMBDC_RESTRICT {
350 Sequence seq = readSeq_[PARALLEL_CONSUMER_INDEX];
351 if (hmbdc_unlikely(seq >= READ_SEQ_MAX)) {
352 HMBDC_THROW(DeadConsumer, PARALLEL_CONSUMER_INDEX);
355 seq + size > toBeClaimedSeq_;
360 __atomic_fetch_add(readSeq_ + PARALLEL_CONSUMER_INDEX, size, __ATOMIC_RELEASE);
371 void wasteAfterPeek(uint16_t PARALLEL_CONSUMER_INDEX,
size_t size) HMBDC_RESTRICT
375 __atomic_fetch_add(readSeq_ + PARALLEL_CONSUMER_INDEX, size, __ATOMIC_RELEASE);
378 Sequence catchUpWith(uint16_t PARALLEL_CONSUMER_INDEX, uint16_t WITH_PARALLEL_CONSUMER_INDEX) {
379 readSeq_[PARALLEL_CONSUMER_INDEX] = readSeq_[WITH_PARALLEL_CONSUMER_INDEX];
381 __atomic_thread_fence(__ATOMIC_ACQ_REL);
382 return readSeq_[PARALLEL_CONSUMER_INDEX];
385 void catchUpTo(uint16_t PARALLEL_CONSUMER_INDEX, Sequence newSeq) {
386 if (readSeq_[PARALLEL_CONSUMER_INDEX] <= newSeq) {
387 readSeq_[PARALLEL_CONSUMER_INDEX] = newSeq;
390 __atomic_thread_fence(__ATOMIC_RELEASE);
393 size_t remainingSize(uint16_t index)
const HMBDC_RESTRICT {
395 __atomic_thread_fence(__ATOMIC_ACQUIRE);
396 Sequence r = readSeq_[index];
397 Sequence w = toBeClaimedSeq_;
398 return w > r ? w - r : 0;
400 size_t remainingSize() const HMBDC_RESTRICT {
402 __atomic_thread_fence(__ATOMIC_ACQUIRE);
403 Sequence r = readSeqLow();
404 Sequence w = toBeClaimedSeq_;
405 return w > r ? w - r : 0;
408 void reset(uint16_t PARALLEL_CONSUMER_INDEX) {
411 readSeq_[PARALLEL_CONSUMER_INDEX] = toBeClaimedSeq_;
412 index = readSeq_[PARALLEL_CONSUMER_INDEX] & MASK;
414 __atomic_thread_fence(__ATOMIC_ACQ_REL);
415 }
while (*buffer_.getSeq(index) == readSeq_[PARALLEL_CONSUMER_INDEX]);
418 size_t parallelConsumerAlive()
const {
419 return count_if(readSeq_, readSeq_ + PARALLEL_CONSUMER_COUNT
420 , [m = READ_SEQ_MAX](Sequence s) {
432 __sync_synchronize();
434 for (uint16_t i = 0; i < PARALLEL_CONSUMER_COUNT; ++i) {
435 auto seq = readSeq_[i];
436 if (seq < READ_SEQ_MAX) {
437 size_t index = seq & MASK;
438 if (hmbdc_unlikely(readSeqLastPurge_[i] == seq)) {
439 if (i == findSlowestReader()) {
442 }
else if (hmbdc_unlikely(seq == *buffer_.getSeq(index))) {
443 readSeqLastPurge_[i] = seq;
447 for (uint16_t i = 0; res && i < PARALLEL_CONSUMER_COUNT; ++i) {
448 if (res & (1ul << i)) markDead(i);
458 #include "hmbdc/pattern/MemRingBuffer2.hpp" 460 namespace hmbdc {
namespace pattern {
461 template<u
int16_t PARALLEL_CONSUMER_COUNT,
typename SeqT =
size_t>
462 using MemRingBuffer = memringbuffer_detail::MemRingBuffer<PARALLEL_CONSUMER_COUNT, SeqT>;
void wasteAfterPeek(uint16_t PARALLEL_CONSUMER_INDEX, size_t size) HMBDC_RESTRICT
for batching process, mark the next items consumed
Definition: MemRingBuffer.hpp:371
Definition: TypedString.hpp:84
Definition: LockFreeBufferMisc.hpp:23
Definition: MemRingBuffer.hpp:24
Definition: MemRingBuffer.hpp:43
iterator tryClaim(size_t n) HMBDC_RESTRICT
claim slots in the ring buffer for write, return empty iterator when not possible - this call does no...
Definition: MemRingBuffer.hpp:209
uint64_t purge()
call this peroidically to mark the stuck consumers dead
Definition: MemRingBuffer.hpp:431
Definition: LockFreeBufferMisc.hpp:89