1 #include "hmbdc/Copyright.hpp" 4 #include "hmbdc/pattern/MemRingBuffer.hpp" 7 namespace hmbdc {
namespace pattern {
9 namespace ringbuffer_detail {
10 template <
typename T,
typename Seq>
11 uint32_t itemSizePower2Num() {
13 while ((1ul << res) <
sizeof(T) +
sizeof(Seq)) res++;
18 template<
typename T, u
int32_t parallel_consumer_count>
23 enum {PARALLEL_CONSUMER_COUNT = parallel_consumer_count};
24 const size_t CAPACITY;
31 struct iterator : std::iterator<std::forward_iterator_tag, T> {
32 friend class RingBuffer<T, parallel_consumer_count>;
38 explicit iterator(IMPL_IT
const& impl)
43 typename IMPL::Sequence seq()
const {
return impl_.seq();}
56 iterator operator + (
size_t dis)
const {
62 size_t operator - (
iterator const& other)
const {
63 return impl_ - other.impl_;
66 explicit operator bool()
const {
70 bool operator == (
iterator const& other)
const {
return impl_ == other.impl_;}
71 bool operator != (
iterator const& other)
const {
return impl_ != other.impl_;}
73 value_type& operator*(){
return *
reinterpret_cast<value_type *
>(*impl_);}
74 value_type* operator->(){
return reinterpret_cast<value_type *
>(*impl_);}
78 : impl_(ringbuffer_detail::itemSizePower2Num<T, typename
IMPL::Sequence>(), ringSizePower2Num)
79 , CAPACITY(impl_.CAPACITY){}
82 void put(T
const & item) {impl_.put(&item,
sizeof(T));}
83 bool tryPut(T
const & item) {
return impl_.tryPut(&item,
sizeof(T));}
84 void killPut(T
const & item) {impl_.killPut(&item,
sizeof(T));}
85 template <
typename ApplyToKilled>
86 void screezeIn(T
const & item, ApplyToKilled f) {impl_.screezeIn(&item, f,
sizeof(T));}
88 bool isFull()
const {
return impl_.isFull();}
89 iterator claim(){
return iterator(impl_.claim());}
90 iterator tryClaim(){
return iterator(impl_.tryClaim());}
91 iterator tryClaim(
size_t n){
return iterator(impl_.tryClaim(n));}
92 iterator claim(
size_t n) {
return iterator(impl_.claim(n));}
93 iterator killClaim() {
return iterator(impl_.killClaim());}
94 iterator killClaim(
size_t n) {
return iterator(impl_.killClaim(n));}
95 void commit(iterator
const& it){impl_.commit(it.impl_);}
96 void commit(iterator from,
size_t n) {impl_.commit(from.impl_, n);}
97 void markDead(uint32_t parallel_consumer_index) {impl_.markDead(parallel_consumer_index);}
98 T take(uint32_t PARALLEL_CONSUMER_INDEX) {
100 impl_.take(PARALLEL_CONSUMER_INDEX, &res,
sizeof(res));
104 T takeReentrant(uint32_t PARALLEL_CONSUMER_INDEX) {
106 impl_.takeReentrant(PARALLEL_CONSUMER_INDEX, &res,
sizeof(res));
110 iterator peek(uint32_t PARALLEL_CONSUMER_INDEX)
const {
111 return iterator(impl_.peek(PARALLEL_CONSUMER_INDEX));
114 size_t peek(uint32_t PARALLEL_CONSUMER_INDEX, iterator& begin, iterator& end,
size_t maxPeekSize = std::numeric_limits<size_t>::max())
const {
115 return impl_.peek(PARALLEL_CONSUMER_INDEX, begin.impl_, end.impl_,maxPeekSize);
118 void waste(uint32_t PARALLEL_CONSUMER_INDEX,
size_t size) { impl_.waste(PARALLEL_CONSUMER_INDEX, size); }
119 void wasteAfterPeek(uint32_t PARALLEL_CONSUMER_INDEX,
size_t size) { impl_.
wasteAfterPeek(PARALLEL_CONSUMER_INDEX, size); }
122 size_t remainingSize(uint32_t index)
const {
return impl_.remainingSize(index); }
123 size_t remainingSize()
const {
return impl_.remainingSize(); }
124 void reset(uint32_t PARALLEL_CONSUMER_INDEX) { impl_.reset(PARALLEL_CONSUMER_INDEX); }
125 uint64_t purge() {
return impl_.
purge(); }
133 const size_t CAPACITY;
134 using value_type = T;
138 struct iterator : std::iterator<std::forward_iterator_tag, T> {
145 explicit iterator(IMPL_IT
const& impl)
150 typename IMPL::Sequence seq()
const {
return impl_.seq();}
163 iterator operator + (
size_t dis)
const {
169 size_t operator - (
iterator const& other)
const {
170 return impl_ - other.impl_;
173 explicit operator bool()
const {
177 bool operator == (
iterator const& other)
const {
return impl_ == other.impl_;}
178 bool operator != (
iterator const& other)
const {
return impl_ != other.impl_;}
180 value_type& operator*(){
return *
reinterpret_cast<value_type *
>(*impl_);}
181 value_type* operator->(){
return reinterpret_cast<value_type *
>(*impl_);}
185 : impl_(ringbuffer_detail::itemSizePower2Num<T, typename
IMPL::Sequence>(), ringSizePower2Num)
186 , CAPACITY(impl_.CAPACITY){}
189 void put(T
const & item) {impl_.put(&item,
sizeof(T));}
190 bool tryPut(T
const & item) {
return impl_.tryPut(&item,
sizeof(T));}
192 bool isFull()
const {
return impl_.isFull();}
193 iterator claim(){
return iterator(impl_.claim());}
194 iterator tryClaim(){
return iterator(impl_.tryClaim());}
195 iterator tryClaim(
size_t n){
return iterator(impl_.tryClaim(n));}
196 iterator claim(
size_t n) {
return iterator(impl_.claim(n));}
197 void commit(iterator
const& it){impl_.commit(it.impl_);}
198 void commit(iterator from,
size_t n) {impl_.commit(from.impl_, n);}
201 impl_.take(&res,
sizeof(res));
205 bool tryTake(T& res) {
206 return impl_.tryTake(&res,
sizeof(res));
210 return iterator(impl_.peek());
213 size_t peek(iterator& begin, iterator& end) {
214 return impl_.peek(begin.impl_, end.impl_);
217 size_t peekAll(iterator& begin, iterator& end) {
218 return impl_.peekAll(begin.impl_, end.impl_);
221 void wasteAfterPeek(iterator begin,
size_t n,
bool incomplete =
false) {
225 size_t remainingSize()
const {
return impl_.remainingSize(); }
226 void reset() { impl_.reset(); }
void wasteAfterPeek(uint16_t PARALLEL_CONSUMER_INDEX, size_t size) HMBDC_RESTRICT
for batching process, mark the next items consumed
Definition: MemRingBuffer.hpp:371
Definition: LockFreeBufferMisc.hpp:23
Definition: MemRingBuffer.hpp:43
Definition: RingBuffer.hpp:19
Definition: RingBuffer.hpp:31
uint64_t purge()
call this peroidically to mark the stuck consumers dead
Definition: MemRingBuffer.hpp:431
Definition: LockFreeBufferMisc.hpp:89