hmbdc
simplify-high-performance-messaging-programming
MemRingBuffer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/pattern/LockFreeBufferMisc.hpp"
4 #include "hmbdc/Exception.hpp"
5 #include "hmbdc/Compile.hpp"
6 
7 #include <boost/smart_ptr/detail/yield_k.hpp>
8 
9 #include <iterator>
10 #include <thread>
11 #include <vector>
12 #include <limits>
13 #include <algorithm>
14 #include <mutex>
15 
16 #ifndef HMBDC_YIELD
17 #define HMBDC_YIELD(x) boost::detail::yield(x)
18 #endif
19 
20 namespace hmbdc { namespace pattern {
21 
22 namespace memringbuffer_detail {
23 using namespace std;
24 struct my_spin_lock : std::atomic_flag {
25  my_spin_lock()
26  : std::atomic_flag{0} {
27  //clang has ATOMIC_FLAG_INIT as {0} not 0, so ...
28  constexpr int f[] = ATOMIC_FLAG_INIT;
29  static_assert(0 == f[0], "");
30  }
31  void lock() {
32  while (test_and_set(std::memory_order_acquire)) {
33  std::this_thread::yield();
34  }
35  }
36 
37  void unlock() {
38  clear(std::memory_order_release);
39  }
40 };
41 
42 template<uint16_t parallel_consumer_count, typename SeqT = size_t>
44 public:
45  using Sequence = SeqT;
46  enum {PARALLEL_CONSUMER_COUNT = parallel_consumer_count,};
47 
48  const size_t CAPACITY;
49  const size_t VALUE_TYPE_SIZE;
50 private:
51 
52  const Sequence READ_SEQ_MAX;
53  const Sequence MASK;
54 
56  Sequence toBeClaimedSeq_
57  __attribute__((__aligned__(SMP_CACHE_BYTES)));
58  Sequence readSeq_[PARALLEL_CONSUMER_COUNT]
59  __attribute__((__aligned__(SMP_CACHE_BYTES)));
60  Sequence readSeqLastPurge_[PARALLEL_CONSUMER_COUNT]
61  __attribute__((__aligned__(SMP_CACHE_BYTES)));
62 
63  my_spin_lock locks_[PARALLEL_CONSUMER_COUNT];
64 
65  inline __attribute__ ((always_inline))
66  Sequence readSeqLow() const HMBDC_RESTRICT {
67  Sequence res = readSeq_[0];
68  for (uint16_t i = 1;
69  i < PARALLEL_CONSUMER_COUNT; ++i)
70  if (res > readSeq_[i]) res = readSeq_[i];
71  return res;
72  }
73 
74  inline __attribute__ ((always_inline))
75  uint16_t findSlowestReader() const HMBDC_RESTRICT {
76  Sequence smallest = readSeq_[0];
77  uint16_t smallestLoc = 0;
78  for (uint16_t i = 1; i < PARALLEL_CONSUMER_COUNT; ++i)
79  if (smallest > readSeq_[i]) {
80  smallest = readSeq_[i];
81  smallestLoc = i;
82  }
83  return smallestLoc;
84  }
85 
86 public:
87  using value_type = void *;
89 
91 
92  static size_t footprint(uint32_t valueTypeSizePower2Num, uint32_t ringSizePower2Num) {
93  return sizeof(MemRingBuffer) + decltype(buffer_)::footprint(
94  valueTypeSizePower2Num, ringSizePower2Num) + SMP_CACHE_BYTES;
95  }
96 
97  template <typename Allocator = os::DefaultAllocator>
98  MemRingBuffer(uint32_t valueTypeSizePower2Num, uint32_t ringSizePower2Num
99  , Allocator& allocator = os::DefaultAllocator::instance())
100  : CAPACITY(1u << ringSizePower2Num)
101  , VALUE_TYPE_SIZE((1u << valueTypeSizePower2Num) - sizeof(Sequence))
102  , READ_SEQ_MAX(numeric_limits<Sequence>::max() - CAPACITY - 1000u)
103  //1000 is a safe number yet small for how many threads implement a single consumer
104  , MASK(CAPACITY - 1)
105  , buffer_(valueTypeSizePower2Num, ringSizePower2Num
106  , (allocator))
107  , toBeClaimedSeq_(0u) {
108  fill_n(readSeq_, (int)PARALLEL_CONSUMER_COUNT, 0);
109  fill_n(readSeqLastPurge_, (int)PARALLEL_CONSUMER_COUNT, READ_SEQ_MAX);
110  for (auto i = CAPACITY; i != 0 ; --i) {
111  *buffer_.getSeq(i - 1) = numeric_limits<Sequence>::max();
112  }
113  }
114 
115  void put(void const* HMBDC_RESTRICT item, size_t sizeHint = 0) HMBDC_RESTRICT {
116  // Sequence seq = __sync_fetch_and_add(&toBeClaimedSeq_, 1);
117  Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
118  for (uint32_t k = 0;
119  seq >= CAPACITY + readSeqLow();
120  ++k) {
121  HMBDC_YIELD(k);
122  }
123  size_t index = seq & MASK;
124  memcpy(buffer_ + index
125  , item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
126  // __sync_synchronize();
127  __atomic_thread_fence(__ATOMIC_ACQUIRE);
128  *buffer_.getSeq(index) = seq;
129  }
130 
131  bool tryPut(void const* HMBDC_RESTRICT item, size_t sizeHint = 0) HMBDC_RESTRICT {
132  // __sync_synchronize();
133  __atomic_thread_fence(__ATOMIC_ACQUIRE);
134  for(auto seq = toBeClaimedSeq_;
135  seq < CAPACITY + readSeqLow();
136  seq = toBeClaimedSeq_) {
137  // if (hmbdc_likely(__sync_bool_compare_and_swap(&toBeClaimedSeq_, seq, seq + 1))) {
138  if (hmbdc_likely(__atomic_compare_exchange_n (
139  &toBeClaimedSeq_, &seq, seq + 1, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
140  size_t index = seq & MASK;
141  memcpy(buffer_ + index
142  , item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
143  // __sync_synchronize();
144  __atomic_thread_fence(__ATOMIC_ACQUIRE);
145  *buffer_.getSeq(index) = seq;
146  return true;
147  }
148  }
149  return false;
150  }
151 
152  void killPut(void const* HMBDC_RESTRICT item, size_t sizeHint = 0) HMBDC_RESTRICT {
153  // Sequence seq = __sync_fetch_and_add(&toBeClaimedSeq_, 1);
154  Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
155  while (seq >= CAPACITY + readSeqLow()) {
156  uint16_t slowLoc = findSlowestReader();
157  markDead(slowLoc);
158  }
159 
160  size_t index = seq & MASK;
161  memcpy(buffer_ + index, item, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
162  __atomic_thread_fence(__ATOMIC_ACQUIRE);
163  *buffer_.getSeq(index) = seq;
164  }
165 
166  bool isFull() const {
167  return toBeClaimedSeq_ >= CAPACITY + readSeqLow();
168  }
169 
170 
171  Sequence readSeq(uint16_t PARALLEL_CONSUMER_INDEX) const HMBDC_RESTRICT {
172  return readSeq_[PARALLEL_CONSUMER_INDEX];
173  }
174 
175  iterator claim() HMBDC_RESTRICT {
176  // Sequence seq = __sync_fetch_and_add(&toBeClaimedSeq_, 1);
177  Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
178  for (uint32_t k = 0;
179  seq >= CAPACITY + readSeqLow();
180  ++k) {
181  HMBDC_YIELD(k);
182  }
183  return iterator(buffer_, seq);
184  }
185 
186  iterator tryClaim() HMBDC_RESTRICT {
187  // __sync_synchronize();
188  __atomic_thread_fence(__ATOMIC_ACQUIRE);
189  for(auto seq = toBeClaimedSeq_;
190  seq < CAPACITY + readSeqLow();
191  seq = toBeClaimedSeq_) {
192  // if (hmbdc_likely(__sync_bool_compare_and_swap(&toBeClaimedSeq_, seq, seq + 1))) {
193  if (hmbdc_likely(__atomic_compare_exchange_n (
194  &toBeClaimedSeq_, &seq, seq + 1, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
195  return iterator(buffer_, seq);
196  }
197  }
198  return iterator();
199  }
200 
201  /**
202  * @brief claim slots in the ring buffer for write, return empty iterator
203  * when not possible - this call does not block
204  *
205  * @details the user needs to call commit after being done with the slots
206  * @param n size of the claimed slots
207  * @return iterator pointing to the start of the slot, or empty when not possible
208  */
209  iterator tryClaim(size_t n) HMBDC_RESTRICT {
210  // __sync_synchronize();
211  __atomic_thread_fence(__ATOMIC_ACQUIRE);
212  for(auto seq = toBeClaimedSeq_;
213  seq + n - 1 < CAPACITY + readSeqLow();
214  seq = toBeClaimedSeq_) {
215  // if (hmbdc_likely(__sync_bool_compare_and_swap(&toBeClaimedSeq_, seq, seq + n))) {
216  if (hmbdc_likely(__atomic_compare_exchange_n (
217  &toBeClaimedSeq_, &seq, seq + n, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
218  return iterator(buffer_, seq);
219  }
220  }
221  return iterator();
222  }
223 
224  iterator claim(size_t n) HMBDC_RESTRICT {
225  // Sequence seq = __sync_fetch_and_add(&toBeClaimedSeq_, n);
226  Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, n, __ATOMIC_RELAXED);
227  for (uint32_t k = 0;
228  seq + n > CAPACITY + readSeqLow();
229  ++k) {
230  HMBDC_YIELD(k);
231  }
232  return iterator(buffer_, seq);
233  }
234 
235  iterator killClaim() HMBDC_RESTRICT {
236  // Sequence seq = __sync_fetch_and_add(&toBeClaimedSeq_, 1);
237  Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, 1, __ATOMIC_RELAXED);
238  while (seq >= CAPACITY + readSeqLow()) {
239  uint16_t slowLoc = findSlowestReader();
240  markDead(slowLoc);
241  }
242  return iterator(buffer_, seq);
243  }
244 
245  iterator killClaim(size_t n) HMBDC_RESTRICT {
246  // Sequence seq = __sync_fetch_and_add(&toBeClaimedSeq_, n);
247  Sequence seq = __atomic_fetch_add(&toBeClaimedSeq_, n, __ATOMIC_RELAXED);
248  while (seq + n > CAPACITY + readSeqLow()) {
249  uint16_t slowLoc = findSlowestReader();
250  markDead(slowLoc);
251  }
252  return iterator(buffer_, seq);
253  }
254 
255  void commit(iterator it) HMBDC_RESTRICT {
256  __sync_synchronize();
257  // __atomic_thread_fence(__ATOMIC_ACQUIRE);
258  *buffer_.getSeq(*it - buffer_) = it.seq_;
259  }
260 
261  void commit(iterator from, size_t n) HMBDC_RESTRICT {
262  __sync_synchronize();
263  // __atomic_thread_fence(__ATOMIC_ACQUIRE);
264  for (size_t i = 0; i < n; ++i) {
265  *buffer_.getSeq(*from - buffer_) = from.seq_;
266  ++from;
267  }
268  }
269 
270  void markDead(uint16_t parallel_consumer_index) HMBDC_RESTRICT {
271  if (parallel_consumer_index < PARALLEL_CONSUMER_COUNT) {
272  readSeq_[parallel_consumer_index] = READ_SEQ_MAX;
273  __sync_synchronize();
274  // __atomic_thread_fence(__ATOMIC_RELEASE);
275  }
276  }
277 
278  vector<uint16_t>
279  unusedConsumerIndexes() const {
280  vector<uint16_t> res;
281  for (uint16_t i = 0; i < PARALLEL_CONSUMER_COUNT; ++i) {
282  if (readSeq_[i] == READ_SEQ_MAX) {
283  res.push_back(i);
284  }
285  }
286  return res;
287  }
288 
289  void take(uint16_t PARALLEL_CONSUMER_INDEX, void * HMBDC_RESTRICT item, size_t sizeHint = 0) HMBDC_RESTRICT {
290  auto seq = readSeq_[PARALLEL_CONSUMER_INDEX];
291  if (hmbdc_unlikely(seq >= READ_SEQ_MAX)) {
292  HMBDC_THROW(DeadConsumer, PARALLEL_CONSUMER_INDEX);
293  }
294  size_t index = seq & MASK;
295  for (uint32_t k = 0;
296  seq != *buffer_.getSeq(index);
297  ++k) {
298  HMBDC_YIELD(k);
299  }
300 
301  memcpy(item, buffer_ + index, sizeHint ? sizeHint : VALUE_TYPE_SIZE);
302  // __sync_fetch_and_add(readSeq_ + PARALLEL_CONSUMER_INDEX, 1);
303  __atomic_fetch_add(readSeq_ + PARALLEL_CONSUMER_INDEX, 1, __ATOMIC_RELEASE);
304  }
305 
306  void takeReentrant(uint16_t PARALLEL_CONSUMER_INDEX, void * HMBDC_RESTRICT item, size_t sizeHint = 0) HMBDC_RESTRICT {
307  lock_guard<my_spin_lock> guard(locks_[PARALLEL_CONSUMER_INDEX]);
308  take(PARALLEL_CONSUMER_INDEX, item, sizeHint);
309  }
310 
311  iterator peek(uint16_t PARALLEL_CONSUMER_INDEX) const HMBDC_RESTRICT {
312  auto readSeq = readSeq_[PARALLEL_CONSUMER_INDEX];
313  if (hmbdc_unlikely(readSeq >= READ_SEQ_MAX)) {
314  HMBDC_THROW(DeadConsumer, PARALLEL_CONSUMER_INDEX);
315  }
316  if (readSeq == *buffer_.getSeq(readSeq & MASK)) {
317  return iterator(buffer_, readSeq);
318  }
319  return iterator();
320  }
321 
322  size_t peek(uint16_t PARALLEL_CONSUMER_INDEX, iterator& begin, iterator& end
323  , size_t maxPeekSize = numeric_limits<size_t>::max()) const {
324  Sequence readSeq = readSeq_[PARALLEL_CONSUMER_INDEX];
325  if (hmbdc_unlikely(readSeq >= READ_SEQ_MAX)) {
326  HMBDC_THROW(DeadConsumer, PARALLEL_CONSUMER_INDEX);
327  }
328  begin = iterator(buffer_, readSeq);
329  while (readSeq == *buffer_.getSeq(readSeq & MASK)
330  && maxPeekSize--) {
331  ++readSeq;
332  }
333  end = iterator(buffer_, readSeq);
334  return readSeq - readSeq_[PARALLEL_CONSUMER_INDEX];
335  }
336 
337  size_t peekSome(uint16_t PARALLEL_CONSUMER_INDEX, iterator& begin, iterator& end
338  , size_t maxPeekSize = numeric_limits<size_t>::max()) const {
339  size_t res;
340  for (uint32_t k = 0;
341  !(res = peek(PARALLEL_CONSUMER_INDEX, begin, end, maxPeekSize))
342  && k < 64;
343  ++k) {
344  HMBDC_YIELD(k);
345  }
346  return res;
347  }
348 
349  void waste(uint16_t PARALLEL_CONSUMER_INDEX, size_t size) HMBDC_RESTRICT {
350  Sequence seq = readSeq_[PARALLEL_CONSUMER_INDEX];
351  if (hmbdc_unlikely(seq >= READ_SEQ_MAX)) {
352  HMBDC_THROW(DeadConsumer, PARALLEL_CONSUMER_INDEX);
353  }
354  for (uint32_t k = 0;
355  seq + size > toBeClaimedSeq_;
356  ++k) {
357  HMBDC_YIELD(k);
358  }
359  // __sync_fetch_and_add(readSeq_ + PARALLEL_CONSUMER_INDEX, size);
360  __atomic_fetch_add(readSeq_ + PARALLEL_CONSUMER_INDEX, size, __ATOMIC_RELEASE);
361  }
362 
363  /**
364  * @brief for batching process, mark the next items consumed
365  * @details faster than waste, only use it if the size is
366  * what peek just returned or less. Otherwise, undefined behavior
367  *
368  * @param PARALLEL_CONSUMER_INDEX identifying which parallel consumer
369  * @param size consume count
370  */
371  void wasteAfterPeek(uint16_t PARALLEL_CONSUMER_INDEX, size_t size) HMBDC_RESTRICT
372  {
373  if (!size) return;
374  // __sync_fetch_and_add(readSeq_ + PARALLEL_CONSUMER_INDEX, size);
375  __atomic_fetch_add(readSeq_ + PARALLEL_CONSUMER_INDEX, size, __ATOMIC_RELEASE);
376  }
377 
378  Sequence catchUpWith(uint16_t PARALLEL_CONSUMER_INDEX, uint16_t WITH_PARALLEL_CONSUMER_INDEX) {
379  readSeq_[PARALLEL_CONSUMER_INDEX] = readSeq_[WITH_PARALLEL_CONSUMER_INDEX];
380  // __sync_synchronize();
381  __atomic_thread_fence(__ATOMIC_ACQ_REL);
382  return readSeq_[PARALLEL_CONSUMER_INDEX];
383  }
384 
385  void catchUpTo(uint16_t PARALLEL_CONSUMER_INDEX, Sequence newSeq) {
386  if (readSeq_[PARALLEL_CONSUMER_INDEX] <= newSeq) {
387  readSeq_[PARALLEL_CONSUMER_INDEX] = newSeq;
388  }
389  // __sync_synchronize();
390  __atomic_thread_fence(__ATOMIC_RELEASE);
391  }
392 
393  size_t remainingSize(uint16_t index) const HMBDC_RESTRICT {
394  // __sync_synchronize();
395  __atomic_thread_fence(__ATOMIC_ACQUIRE);
396  Sequence r = readSeq_[index];
397  Sequence w = toBeClaimedSeq_;
398  return w > r ? w - r : 0;
399  }
400  size_t remainingSize() const HMBDC_RESTRICT {
401  // __sync_synchronize();
402  __atomic_thread_fence(__ATOMIC_ACQUIRE);
403  Sequence r = readSeqLow();
404  Sequence w = toBeClaimedSeq_;
405  return w > r ? w - r : 0;
406  }
407 
408  void reset(uint16_t PARALLEL_CONSUMER_INDEX) {
409  size_t index;
410  do {
411  readSeq_[PARALLEL_CONSUMER_INDEX] = toBeClaimedSeq_;
412  index = readSeq_[PARALLEL_CONSUMER_INDEX] & MASK;
413  // __sync_synchronize();
414  __atomic_thread_fence(__ATOMIC_ACQ_REL);
415  } while (*buffer_.getSeq(index) == readSeq_[PARALLEL_CONSUMER_INDEX]);
416  }
417 
418  size_t parallelConsumerAlive() const {
419  return count_if(readSeq_, readSeq_ + PARALLEL_CONSUMER_COUNT
420  , [m = READ_SEQ_MAX](Sequence s) {
421  return s < m;
422  }
423  );
424  }
425 
426  /**
427  * @brief call this peroidically to mark the stuck consumers dead
428  * @details caller needs to make sure there r new messages between calls
429  * @return mask indicating which consumer marked dead
430  */
431  uint64_t purge() {
432  __sync_synchronize();
433  uint64_t res = 0;
434  for (uint16_t i = 0; i < PARALLEL_CONSUMER_COUNT; ++i) {
435  auto seq = readSeq_[i];
436  if (seq < READ_SEQ_MAX) {
437  size_t index = seq & MASK;
438  if (hmbdc_unlikely(readSeqLastPurge_[i] == seq)) {
439  if (i == findSlowestReader()) {
440  res |= (1ul << i);
441  }
442  } else if (hmbdc_unlikely(seq == *buffer_.getSeq(index))) {
443  readSeqLastPurge_[i] = seq;
444  }
445  }
446  }
447  for (uint16_t i = 0; res && i < PARALLEL_CONSUMER_COUNT; ++i) {
448  if (res & (1ul << i)) markDead(i);
449  }
450  return res;
451  }
452 };
453 
454 } //memringbuffer_detail
455 
456 }} // end namespace hmbdc::pattern
457 
458 #include "hmbdc/pattern/MemRingBuffer2.hpp"
459 
460 namespace hmbdc { namespace pattern {
461 template<uint16_t PARALLEL_CONSUMER_COUNT, typename SeqT = size_t>
462 using MemRingBuffer = memringbuffer_detail::MemRingBuffer<PARALLEL_CONSUMER_COUNT, SeqT>;
463 }}
void wasteAfterPeek(uint16_t PARALLEL_CONSUMER_INDEX, size_t size) HMBDC_RESTRICT
for batching process, mark the next items consumed
Definition: MemRingBuffer.hpp:371
Definition: TypedString.hpp:84
Definition: LockFreeBufferMisc.hpp:23
iterator tryClaim(size_t n) HMBDC_RESTRICT
claim slots in the ring buffer for write, return empty iterator when not possible - this call does no...
Definition: MemRingBuffer.hpp:209
Definition: Base.hpp:12
uint64_t purge()
call this peroidically to mark the stuck consumers dead
Definition: MemRingBuffer.hpp:431
Definition: LockFreeBufferMisc.hpp:89