hmbdc
simplify-high-performance-messaging-programming
MemRingBuffer2.hpp
1 namespace hmbdc { namespace pattern {
2 
3 namespace memringbuffer_detail {
4 template<typename SeqT>
5 class MemRingBuffer<0u, SeqT> {
6 public:
7  using Sequence = SeqT;
8  enum {PARALLEL_CONSUMER_COUNT = 0,}; //not used
9 
10  const SeqT CAPACITY;
11  const SeqT VALUE_TYPE_SIZE;
12 private:
13  const Sequence MASK;
14 
16  Sequence toBeClaimedSeq_
17  __attribute__((__aligned__(SMP_CACHE_BYTES)));
18  Sequence readSeq_
19  __attribute__((__aligned__(SMP_CACHE_BYTES)));
20 
21  bool readyToWriteOver(Sequence seq) const {
22  return *buffer_.getSeq(seq & MASK) == std::numeric_limits<Sequence>::max();
23  }
24 
25  bool readyToWriteOver(Sequence seq, size_t n) const {
26  for (auto i = 0ul; i < n; ++i) {
27  if (*buffer_.getSeq(seq & MASK) != std::numeric_limits<Sequence>::max()) {
28  return false;
29  }
30  }
31  return true;
32  }
33 
34  void markAsToWriteOver(Sequence seq) {
35  *buffer_.getSeq(seq & MASK) = std::numeric_limits<Sequence>::max();
36  }
37 
38 
39 public:
40  using value_type = void *;
41  using DeadConsumer = hmbdc::pattern::lf_misc::DeadConsumer; //not used, just for compiling
43 
44  template <typename Allocator = os::DefaultAllocator>
45  MemRingBuffer(uint32_t valueTypeSizePower2Num, uint32_t ringSizePower2Num
46  , Allocator& allocator = os::DefaultAllocator::instance())
47  : CAPACITY(1u << ringSizePower2Num)
48  , VALUE_TYPE_SIZE((1u << valueTypeSizePower2Num) - sizeof(Sequence))
49  , MASK(CAPACITY - 1)
50  , buffer_(valueTypeSizePower2Num, ringSizePower2Num, (allocator))
51  , toBeClaimedSeq_(0u)
52  , readSeq_(0u) {
53  for (auto i = CAPACITY; i != 0 ; --i) {
54  markAsToWriteOver(i - 1);
55  }
56  }
57 
58  static
59  size_t footprint(uint32_t valueTypeSizePower2Num, uint32_t ringSizePower2Num) {
60  return sizeof(MemRingBuffer) + decltype(buffer_)::footprint(
61  valueTypeSizePower2Num, ringSizePower2Num) + SMP_CACHE_BYTES;
62  }
63 
64  void put(void const* HMBDC_RESTRICT item, size_t sizeHint = 0) HMBDC_RESTRICT {
65  // Sequence seq = __sync_fetch_and_add(&toBeClaimedSeq_, 1);
66  Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
67  for (uint32_t k = 0;
68  seq >= CAPACITY + readSeq_ || !readyToWriteOver(seq);
69  ++k) {
70  boost::detail::yield(k);
71  }
72  auto index = seq & MASK;
73  memcpy(buffer_ + index
74  , item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
75  // __sync_synchronize();
76  __atomic_thread_fence(__ATOMIC_ACQUIRE);
77  *buffer_.getSeq(index) = seq;
78  }
79 
80  bool tryPut(void const* HMBDC_RESTRICT item, size_t sizeHint = 0) HMBDC_RESTRICT {
81  // __sync_synchronize();
82  __atomic_thread_fence(__ATOMIC_ACQUIRE);
83  for(auto seq = toBeClaimedSeq_;
84  seq < CAPACITY + readSeq_ && readyToWriteOver(seq);
85  seq = toBeClaimedSeq_) {
86  if (hmbdc_likely(__sync_bool_compare_and_swap(
87  &toBeClaimedSeq_, seq, seq + 1))) {
88  auto index = seq & MASK;
89  memcpy(buffer_ + index
90  , item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
91  // __sync_synchronize();
92  __atomic_thread_fence(__ATOMIC_ACQUIRE);
93  *buffer_.getSeq(index) = seq;
94  return true;
95  }
96  }
97  return false;
98  }
99 
100  bool isFull() const {
101  return toBeClaimedSeq_ >= CAPACITY + readSeq_;
102  }
103 
104  SeqT readSeq() const {
105  return readSeq_;
106  }
107 
108  iterator claim() HMBDC_RESTRICT {
109  // Sequence seq = __sync_fetch_and_add(&toBeClaimedSeq_, 1);
110  Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
111  for (uint32_t k = 0;
112  seq >= CAPACITY + readSeq_
113  || !readyToWriteOver(seq);
114  ++k) {
115  boost::detail::yield(k);
116  }
117  return iterator(buffer_, seq);
118  }
119 
120  iterator tryClaim() HMBDC_RESTRICT {
121  // __sync_synchronize();
122  __atomic_thread_fence(__ATOMIC_ACQUIRE);
123  for(auto seq = toBeClaimedSeq_;
124  seq < CAPACITY + readSeq_
125  && readyToWriteOver(seq);
126  seq = toBeClaimedSeq_) {
127  // if (hmbdc_likely(__sync_bool_compare_and_swap(&toBeClaimedSeq_, seq, seq + 1)))
128  if (hmbdc_likely(__atomic_compare_exchange_n (
129  &toBeClaimedSeq_, &seq, seq + 1, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
130  return iterator(buffer_, seq);
131  }
132  }
133  return iterator();
134  }
135 
136  iterator tryClaim(size_t n) HMBDC_RESTRICT {
137  // __sync_synchronize();
138  __atomic_thread_fence(__ATOMIC_ACQUIRE);
139  for(auto seq = toBeClaimedSeq_;
140  seq + n - 1 < CAPACITY + readSeq_
141  && readyToWriteOver(seq, n);
142  seq = toBeClaimedSeq_) {
143  // if (hmbdc_likely(__sync_bool_compare_and_swap(&toBeClaimedSeq_, seq, seq + n))) {
144  if (hmbdc_likely(__atomic_compare_exchange_n (
145  &toBeClaimedSeq_, &seq, seq + n, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
146  return iterator(buffer_, seq);
147  }
148  }
149  return iterator();
150  }
151 
152  iterator claim(size_t n) HMBDC_RESTRICT {
153  // Sequence seq = __sync_fetch_and_add(&toBeClaimedSeq_, n);
154  Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, n, __ATOMIC_RELAXED);
155  for (uint32_t k = 0;
156  seq + n > CAPACITY + readSeq_;
157  ++k) {
158  boost::detail::yield(k);
159  }
160  for (uint32_t i = 0, k = 0; i < n;) {
161  if (!readyToWriteOver(seq + i)) {
162  boost::detail::yield(k++);
163  } else {
164  k = 0;
165  i++;
166  }
167  }
168  return iterator(buffer_, seq);
169  }
170 
171  void commit(iterator it) HMBDC_RESTRICT {
172  // __sync_synchronize();
173  __atomic_thread_fence(__ATOMIC_ACQUIRE);
174  *buffer_.getSeq(*it - buffer_) = it.seq_;
175  }
176 
177  void commit(iterator from, size_t n) HMBDC_RESTRICT {
178  // __sync_synchronize();
179  __atomic_thread_fence(__ATOMIC_ACQUIRE);
180  for (size_t i = 0; i < n; ++i) {
181  *buffer_.getSeq(*from - buffer_) = from.seq_;
182  ++from;
183  }
184  }
185 
186  void take(void * HMBDC_RESTRICT item, size_t sizeHint = 0) HMBDC_RESTRICT {
187  Sequence seq;
188  do {
189  seq = readSeq_;
190  auto index = seq & MASK;
191  for (uint32_t k = 0;
192  seq != *buffer_.getSeq(index);
193  ++k) {
194  boost::detail::yield(k);
195  }
196  memcpy(item, buffer_ + index, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
197  // } while (hmbdc_unlikely(!__sync_bool_compare_and_swap(&readSeq_, seq, seq + 1)));
198  } while (hmbdc_unlikely(!__atomic_compare_exchange_n(
199  &readSeq_, &seq, seq + 1, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
200  markAsToWriteOver(seq);
201  }
202 
203  bool tryTake(void * HMBDC_RESTRICT item, size_t sizeHint = 0) HMBDC_RESTRICT {
204  Sequence seq;
205  do {
206  seq = readSeq_;
207  auto index = seq & MASK;
208  if (seq != *buffer_.getSeq(index)) return false;
209  memcpy(item, buffer_ + index, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
210  // } while (hmbdc_unlikely(!__sync_bool_compare_and_swap(&readSeq_, seq, seq + 1)));
211  } while (hmbdc_unlikely(!__atomic_compare_exchange_n(
212  &readSeq_, &seq, seq + 1, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
213  markAsToWriteOver(seq);
214  return true;
215  }
216 
217  iterator peek() HMBDC_RESTRICT {
218  Sequence seq, readSeq;
219  iterator res;
220  do {
221  res.clear();
222  readSeq = readSeq_;
223  seq = readSeq;
224  if (seq == *buffer_.getSeq(seq & MASK)) {
225  res = iterator(buffer_, seq++);
226  }
227  // } while (hmbdc_unlikely(!__sync_bool_compare_and_swap(&readSeq_, readSeq, seq)));
228  } while (hmbdc_unlikely(!__atomic_compare_exchange_n(
229  &readSeq_, &readSeq, seq, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
230  return res;
231  }
232 
233  size_t peek(iterator& begin, iterator& end
234  , size_t maxPeekSize = std::numeric_limits<size_t>::max()) HMBDC_RESTRICT {
235  Sequence seq, readSeq;
236  do {
237  readSeq = readSeq_;
238  seq = readSeq;
239  begin = iterator(buffer_, seq);
240  auto count = maxPeekSize;
241  while (count--
242  && seq == *buffer_.getSeq(seq & MASK)) {
243  ++seq;
244  }
245  end = iterator(buffer_, seq);
246  // } while (hmbdc_unlikely(!__sync_bool_compare_and_swap(&readSeq_, readSeq, seq)));
247  } while (hmbdc_unlikely(!__atomic_compare_exchange_n(
248  &readSeq_, &readSeq, seq, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
249  return end - begin;
250  }
251 
252  size_t peekSome(iterator& begin, iterator& end
253  , size_t maxPeekSize = numeric_limits<size_t>::max()) {
254  size_t res;
255  for (uint32_t k = 0;
256  !(res = peek(begin, end, maxPeekSize))
257  && k < 64;
258  ++k) {
259  HMBDC_YIELD(k);
260  }
261  return res;
262  }
263 
264  size_t peekAll(iterator& begin, iterator& end) HMBDC_RESTRICT {
265  Sequence seq, readSeq;
266  do {
267  readSeq = readSeq_;
268  seq = readSeq;
269  begin = iterator(buffer_, seq);
270 
271  for (uint32_t k = 0;
272  seq < toBeClaimedSeq_;
273  boost::detail::yield(k++)) {
274  while (seq == *buffer_.getSeq(seq & MASK)) {
275  ++seq;
276  }
277  }
278  end = iterator(buffer_, seq);
279  // } while (hmbdc_unlikely(!__sync_bool_compare_and_swap(&readSeq_, readSeq, seq)));
280  } while (hmbdc_unlikely(!__atomic_compare_exchange_n(
281  &readSeq_, &readSeq, seq, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)));
282  return end - begin;
283  }
284 
285  void wasteAfterPeek(iterator begin, size_t size, bool incomplete = false) HMBDC_RESTRICT {
286  if (!size && !incomplete) return;
287  auto seq = begin.seq();
288  for (; seq < begin.seq() + size; seq++) {
289  markAsToWriteOver(seq);
290  }
291  //this is very dangerous if size is not matching;
292  //if any reads other than the matching peek happened
293  //bad things happen
294  if (incomplete) readSeq_ = seq;
295  // __sync_synchronize();
296  __atomic_thread_fence(__ATOMIC_RELEASE);
297  }
298 
299  size_t remainingSize() const HMBDC_RESTRICT {
300  __sync_synchronize();
301  Sequence r = readSeq_;
302  Sequence w = toBeClaimedSeq_;
303  return w > r ? w - r : 0;
304  }
305 
306  void reset() {
307  __sync_synchronize();
308  for (auto i = 0ul; i < CAPACITY; ++i) {
309  markAsToWriteOver(i);
310  }
311  readSeq_ = toBeClaimedSeq_;
312  }
313 };
314 } //memringbuffer_detail
315 
316 
317 }} // end namespace hmbdc::pattern
318 
void wasteAfterPeek(uint16_t PARALLEL_CONSUMER_INDEX, size_t size) HMBDC_RESTRICT
for batching process, mark the next items consumed
Definition: MemRingBuffer.hpp:371
Definition: LockFreeBufferMisc.hpp:23
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89