hmbdc
simplify-high-performance-messaging-programming
RingBuffer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/pattern/MemRingBuffer.hpp"
5 
6 
7 namespace hmbdc { namespace pattern {
8 
9 namespace ringbuffer_detail {
10  template <typename T, typename Seq>
11  uint32_t itemSizePower2Num() {
12  uint32_t res = 1ul;
13  while ((1ul << res) < sizeof(T) + sizeof(Seq)) res++;
14  return res;
15  }
16 }
17 
18 template<typename T, uint32_t parallel_consumer_count>
19 class RingBuffer {
21  IMPL impl_;
22 public:
23  enum {PARALLEL_CONSUMER_COUNT = parallel_consumer_count};
24  const size_t CAPACITY;
25  using value_type = T;
26  // struct DeadConsumer : IMPL::DeadConsumer {using typename IMPL::DeadConsumer::DeadConsumer;};
27  using DeadConsumer = typename IMPL::DeadConsumer;
28 ///
29 /// a forward iterator to access elements
30 ///
31  struct iterator : std::iterator<std::forward_iterator_tag, T> {
32  friend class RingBuffer<T, parallel_consumer_count>;
33 
34  private:
35  using IMPL_IT = typename IMPL::iterator;
36  IMPL_IT impl_;
37 
38  explicit iterator(IMPL_IT const& impl)
39  : impl_(impl){}
40 
41  public:
42  iterator(){}
43  typename IMPL::Sequence seq() const {return impl_.seq();}
44 
45  iterator& operator ++() {
46  ++impl_;
47  return *this;
48  }
49 
50  iterator operator ++(int) {
51  iterator tmp = *this;
52  ++*this;
53  return tmp;
54  }
55 
56  iterator operator + (size_t dis) const {
57  IMPL_IT tmp = impl_;
58  tmp = tmp + dis;
59  return iterator(tmp);
60  }
61 
62  size_t operator - (iterator const& other) const {
63  return impl_ - other.impl_;
64  }
65 
66  explicit operator bool() const {
67  return (bool)impl_;
68  }
69 
70  bool operator == (iterator const& other) const {return impl_ == other.impl_;}
71  bool operator != (iterator const& other) const {return impl_ != other.impl_;}
72  // value_type const& operator*() const{return *reinterpret_cast<value_type const*>(*impl_);}
73  value_type& operator*(){return *reinterpret_cast<value_type *>(*impl_);}
74  value_type* operator->(){return reinterpret_cast<value_type *>(*impl_);}
75  // value_type const* operator->() const{return reinterpret_cast<value_type const*>(*impl_);}
76  };
77  RingBuffer(uint32_t ringSizePower2Num)
78  : impl_(ringbuffer_detail::itemSizePower2Num<T, typename IMPL::Sequence>(), ringSizePower2Num)
79  , CAPACITY(impl_.CAPACITY){}
80 
81 
82  void put(T const & item) {impl_.put(&item, sizeof(T));}
83  bool tryPut(T const & item) {return impl_.tryPut(&item, sizeof(T));}
84  void killPut(T const & item) {impl_.killPut(&item, sizeof(T));}
85  template <typename ApplyToKilled>
86  void screezeIn(T const & item, ApplyToKilled f) {impl_.screezeIn(&item, f, sizeof(T));}
87 
88  bool isFull() const {return impl_.isFull();}
89  iterator claim(){return iterator(impl_.claim());}
90  iterator tryClaim(){return iterator(impl_.tryClaim());}
91  iterator tryClaim(size_t n){return iterator(impl_.tryClaim(n));}
92  iterator claim(size_t n) {return iterator(impl_.claim(n));}
93  iterator killClaim() {return iterator(impl_.killClaim());}
94  iterator killClaim(size_t n) {return iterator(impl_.killClaim(n));}
95  void commit(iterator const& it){impl_.commit(it.impl_);}
96  void commit(iterator from, size_t n) {impl_.commit(from.impl_, n);}
97  void markDead(uint32_t parallel_consumer_index) {impl_.markDead(parallel_consumer_index);}
98  T take(uint32_t PARALLEL_CONSUMER_INDEX) {
99  T res;
100  impl_.take(PARALLEL_CONSUMER_INDEX, &res, sizeof(res));
101  return res;
102  }
103 
104  T takeReentrant(uint32_t PARALLEL_CONSUMER_INDEX) {
105  T res;
106  impl_.takeReentrant(PARALLEL_CONSUMER_INDEX, &res, sizeof(res));
107  return res;
108  }
109 
110  iterator peek(uint32_t PARALLEL_CONSUMER_INDEX) const {
111  return iterator(impl_.peek(PARALLEL_CONSUMER_INDEX));
112  }
113 
114  size_t peek(uint32_t PARALLEL_CONSUMER_INDEX, iterator& begin, iterator& end, size_t maxPeekSize = std::numeric_limits<size_t>::max()) const {
115  return impl_.peek(PARALLEL_CONSUMER_INDEX, begin.impl_, end.impl_,maxPeekSize);
116  }
117 
118  void waste(uint32_t PARALLEL_CONSUMER_INDEX, size_t size) { impl_.waste(PARALLEL_CONSUMER_INDEX, size); }
119  void wasteAfterPeek(uint32_t PARALLEL_CONSUMER_INDEX, size_t size) { impl_.wasteAfterPeek(PARALLEL_CONSUMER_INDEX, size); }
120  // // template <uint32_t PARALLEL_CONSUMER_INDEX>
121  // // uint64_t remainingSize() const { return impl_.remainingSize(PARALLEL_CONSUMER_INDEX); }
122  size_t remainingSize(uint32_t index) const { return impl_.remainingSize(index); }
123  size_t remainingSize() const { return impl_.remainingSize(); }
124  void reset(uint32_t PARALLEL_CONSUMER_INDEX) { impl_.reset(PARALLEL_CONSUMER_INDEX); }
125  uint64_t purge() { return impl_.purge(); }
126 };
127 
128 template<typename T>
129 class RingBuffer<T, 0> {
130  using IMPL = MemRingBuffer<0>;
131  IMPL impl_;
132 public:
133  const size_t CAPACITY;
134  using value_type = T;
135 ///
136 /// a forward iterator to access elements
137 ///
138  struct iterator : std::iterator<std::forward_iterator_tag, T> {
139  friend class RingBuffer<T, 0>;
140 
141  private:
142  using IMPL_IT = typename IMPL::iterator;
143  IMPL_IT impl_;
144 
145  explicit iterator(IMPL_IT const& impl)
146  : impl_(impl){}
147 
148  public:
149  iterator(){}
150  typename IMPL::Sequence seq() const {return impl_.seq();}
151 
152  iterator& operator ++() {
153  ++impl_;
154  return *this;
155  }
156 
157  iterator operator ++(int) {
158  iterator tmp = *this;
159  ++*this;
160  return tmp;
161  }
162 
163  iterator operator + (size_t dis) const {
164  IMPL_IT tmp = impl_;
165  tmp = tmp + dis;
166  return iterator(tmp);
167  }
168 
169  size_t operator - (iterator const& other) const {
170  return impl_ - other.impl_;
171  }
172 
173  explicit operator bool() const {
174  return (bool)impl_;
175  }
176 
177  bool operator == (iterator const& other) const {return impl_ == other.impl_;}
178  bool operator != (iterator const& other) const {return impl_ != other.impl_;}
179  // value_type const& operator*() const{return *reinterpret_cast<value_type const*>(*impl_);}
180  value_type& operator*(){return *reinterpret_cast<value_type *>(*impl_);}
181  value_type* operator->(){return reinterpret_cast<value_type *>(*impl_);}
182  // value_type const* operator->() const{return reinterpret_cast<value_type const*>(*impl_);}
183  };
184  RingBuffer(uint32_t ringSizePower2Num)
185  : impl_(ringbuffer_detail::itemSizePower2Num<T, typename IMPL::Sequence>(), ringSizePower2Num)
186  , CAPACITY(impl_.CAPACITY){}
187 
188 
189  void put(T const & item) {impl_.put(&item, sizeof(T));}
190  bool tryPut(T const & item) {return impl_.tryPut(&item, sizeof(T));}
191 
192  bool isFull() const {return impl_.isFull();}
193  iterator claim(){return iterator(impl_.claim());}
194  iterator tryClaim(){return iterator(impl_.tryClaim());}
195  iterator tryClaim(size_t n){return iterator(impl_.tryClaim(n));}
196  iterator claim(size_t n) {return iterator(impl_.claim(n));}
197  void commit(iterator const& it){impl_.commit(it.impl_);}
198  void commit(iterator from, size_t n) {impl_.commit(from.impl_, n);}
199  T take() {
200  T res;
201  impl_.take(&res, sizeof(res));
202  return res;
203  }
204 
205  bool tryTake(T& res) {
206  return impl_.tryTake(&res, sizeof(res));
207  }
208 
209  iterator peek() {
210  return iterator(impl_.peek());
211  }
212 
213  size_t peek(iterator& begin, iterator& end) {
214  return impl_.peek(begin.impl_, end.impl_);
215  }
216 
217  size_t peekAll(iterator& begin, iterator& end) {
218  return impl_.peekAll(begin.impl_, end.impl_);
219  }
220 
221  void wasteAfterPeek(iterator begin, size_t n, bool incomplete = false) {
222  impl_.wasteAfterPeek(begin.impl_, n, incomplete);
223  }
224 
225  size_t remainingSize() const { return impl_.remainingSize(); }
226  void reset() { impl_.reset(); }
227 };
228 
229 }} // end namespace hmbdc::pattern
230 
void wasteAfterPeek(uint16_t PARALLEL_CONSUMER_INDEX, size_t size) HMBDC_RESTRICT
for batching process, mark the next items consumed
Definition: MemRingBuffer.hpp:371
Definition: LockFreeBufferMisc.hpp:23
Definition: RingBuffer.hpp:19
Definition: RingBuffer.hpp:31
Definition: Base.hpp:12
uint64_t purge()
call this peroidically to mark the stuck consumers dead
Definition: MemRingBuffer.hpp:431
Definition: LockFreeBufferMisc.hpp:89