1 namespace hmbdc {
namespace pattern {
3 namespace memringbuffer_detail {
4 template<
typename SeqT>
8 enum {PARALLEL_CONSUMER_COUNT = 0,};
11 const SeqT VALUE_TYPE_SIZE;
16 Sequence toBeClaimedSeq_
17 __attribute__((__aligned__(SMP_CACHE_BYTES)));
19 __attribute__((__aligned__(SMP_CACHE_BYTES)));
21 bool readyToWriteOver(Sequence seq)
const {
22 return *buffer_.getSeq(seq & MASK) == std::numeric_limits<Sequence>::max();
25 bool readyToWriteOver(Sequence seq,
size_t n)
const {
26 for (
auto i = 0ul; i < n; ++i) {
27 if (*buffer_.getSeq(seq & MASK) != std::numeric_limits<Sequence>::max()) {
34 void markAsToWriteOver(Sequence seq) {
35 *buffer_.getSeq(seq & MASK) = std::numeric_limits<Sequence>::max();
40 using value_type =
void *;
44 template <
typename Allocator = os::DefaultAllocator>
45 MemRingBuffer(uint32_t valueTypeSizePower2Num, uint32_t ringSizePower2Num
46 , Allocator& allocator = os::DefaultAllocator::instance())
47 : CAPACITY(1u << ringSizePower2Num)
48 , VALUE_TYPE_SIZE((1u << valueTypeSizePower2Num) -
sizeof(Sequence))
50 , buffer_(valueTypeSizePower2Num, ringSizePower2Num, (allocator))
53 for (
auto i = CAPACITY; i != 0 ; --i) {
54 markAsToWriteOver(i - 1);
59 size_t footprint(uint32_t valueTypeSizePower2Num, uint32_t ringSizePower2Num) {
61 valueTypeSizePower2Num, ringSizePower2Num) + SMP_CACHE_BYTES;
64 void put(
void const* HMBDC_RESTRICT item,
size_t sizeHint = 0) HMBDC_RESTRICT {
66 Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
68 seq >= CAPACITY + readSeq_ || !readyToWriteOver(seq);
70 boost::detail::yield(k);
72 auto index = seq & MASK;
73 memcpy(buffer_ + index
74 , item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
76 __atomic_thread_fence(__ATOMIC_ACQUIRE);
77 *buffer_.getSeq(index) = seq;
80 bool tryPut(
void const* HMBDC_RESTRICT item,
size_t sizeHint = 0) HMBDC_RESTRICT {
82 __atomic_thread_fence(__ATOMIC_ACQUIRE);
83 for(
auto seq = toBeClaimedSeq_;
84 seq < CAPACITY + readSeq_ && readyToWriteOver(seq);
85 seq = toBeClaimedSeq_) {
86 if (hmbdc_likely(__sync_bool_compare_and_swap(
87 &toBeClaimedSeq_, seq, seq + 1))) {
88 auto index = seq & MASK;
89 memcpy(buffer_ + index
90 , item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
92 __atomic_thread_fence(__ATOMIC_ACQUIRE);
93 *buffer_.getSeq(index) = seq;
100 bool isFull()
const {
101 return toBeClaimedSeq_ >= CAPACITY + readSeq_;
104 SeqT readSeq()
const {
110 Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
112 seq >= CAPACITY + readSeq_
113 || !readyToWriteOver(seq);
115 boost::detail::yield(k);
120 iterator tryClaim() HMBDC_RESTRICT {
122 __atomic_thread_fence(__ATOMIC_ACQUIRE);
123 for(
auto seq = toBeClaimedSeq_;
124 seq < CAPACITY + readSeq_
125 && readyToWriteOver(seq);
126 seq = toBeClaimedSeq_) {
128 if (hmbdc_likely(__atomic_compare_exchange_n (
129 &toBeClaimedSeq_, &seq, seq + 1,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
136 iterator tryClaim(
size_t n) HMBDC_RESTRICT {
138 __atomic_thread_fence(__ATOMIC_ACQUIRE);
139 for(
auto seq = toBeClaimedSeq_;
140 seq + n - 1 < CAPACITY + readSeq_
141 && readyToWriteOver(seq, n);
142 seq = toBeClaimedSeq_) {
144 if (hmbdc_likely(__atomic_compare_exchange_n (
145 &toBeClaimedSeq_, &seq, seq + n,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
152 iterator claim(
size_t n) HMBDC_RESTRICT {
154 Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, n, __ATOMIC_RELAXED);
156 seq + n > CAPACITY + readSeq_;
158 boost::detail::yield(k);
160 for (uint32_t i = 0, k = 0; i < n;) {
161 if (!readyToWriteOver(seq + i)) {
162 boost::detail::yield(k++);
171 void commit(
iterator it) HMBDC_RESTRICT {
173 __atomic_thread_fence(__ATOMIC_ACQUIRE);
174 *buffer_.getSeq(*it - buffer_) = it.seq_;
177 void commit(
iterator from,
size_t n) HMBDC_RESTRICT {
179 __atomic_thread_fence(__ATOMIC_ACQUIRE);
180 for (
size_t i = 0; i < n; ++i) {
181 *buffer_.getSeq(*from - buffer_) = from.seq_;
186 void take(
void * HMBDC_RESTRICT item,
size_t sizeHint = 0) HMBDC_RESTRICT {
190 auto index = seq & MASK;
192 seq != *buffer_.getSeq(index);
194 boost::detail::yield(k);
196 memcpy(item, buffer_ + index, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
198 }
while (hmbdc_unlikely(!__atomic_compare_exchange_n(
199 &readSeq_, &seq, seq + 1,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
200 markAsToWriteOver(seq);
203 bool tryTake(
void * HMBDC_RESTRICT item,
size_t sizeHint = 0) HMBDC_RESTRICT {
207 auto index = seq & MASK;
208 if (seq != *buffer_.getSeq(index))
return false;
209 memcpy(item, buffer_ + index, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
211 }
while (hmbdc_unlikely(!__atomic_compare_exchange_n(
212 &readSeq_, &seq, seq + 1,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
213 markAsToWriteOver(seq);
218 Sequence seq, readSeq;
224 if (seq == *buffer_.getSeq(seq & MASK)) {
228 }
while (hmbdc_unlikely(!__atomic_compare_exchange_n(
229 &readSeq_, &readSeq, seq,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
234 ,
size_t maxPeekSize = std::numeric_limits<size_t>::max()) HMBDC_RESTRICT {
235 Sequence seq, readSeq;
240 auto count = maxPeekSize;
242 && seq == *buffer_.getSeq(seq & MASK)) {
247 }
while (hmbdc_unlikely(!__atomic_compare_exchange_n(
248 &readSeq_, &readSeq, seq,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
253 ,
size_t maxPeekSize = numeric_limits<size_t>::max()) {
256 !(res = peek(begin, end, maxPeekSize))
265 Sequence seq, readSeq;
272 seq < toBeClaimedSeq_;
273 boost::detail::yield(k++)) {
274 while (seq == *buffer_.getSeq(seq & MASK)) {
280 }
while (hmbdc_unlikely(!__atomic_compare_exchange_n(
281 &readSeq_, &readSeq, seq,
true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
286 if (!size && !incomplete)
return;
287 auto seq = begin.seq();
288 for (; seq < begin.seq() + size; seq++) {
289 markAsToWriteOver(seq);
294 if (incomplete) readSeq_ = seq;
296 __atomic_thread_fence(__ATOMIC_RELEASE);
299 size_t remainingSize()
const HMBDC_RESTRICT {
300 __sync_synchronize();
301 Sequence r = readSeq_;
302 Sequence w = toBeClaimedSeq_;
303 return w > r ? w - r : 0;
307 __sync_synchronize();
308 for (
auto i = 0ul; i < CAPACITY; ++i) {
309 markAsToWriteOver(i);
311 readSeq_ = toBeClaimedSeq_;
void wasteAfterPeek(uint16_t PARALLEL_CONSUMER_INDEX, size_t size) HMBDC_RESTRICT
for batching process, mark the next items consumed
Definition: MemRingBuffer.hpp:371
Definition: LockFreeBufferMisc.hpp:23
Definition: MemRingBuffer.hpp:43
Definition: LockFreeBufferMisc.hpp:89