1 #include "hmbdc/Copyright.hpp"
3 #include "hmbdc/Compile.hpp"
4 #include "hmbdc/time/Time.hpp"
5 #include "hmbdc/Config.hpp"
7 #include <condition_variable>
10 namespace hmbdc {
namespace pattern {
12 namespace blocking_buffer_detail {
18 using value_type =
void *;
24 value_type getItem(
size_t seq) {
25 return store_ + seq % capacity_ * maxItemSize_;
28 size_t maxItemSize()
const {
return maxItemSize_;}
29 size_t capacity()
const {
return capacity_;}
30 void put(
void const* item,
size_t sizeHint = 0) {
32 unique_lock<mutex> lck(mutex_);
33 hasSlot_.wait(lck, [
this]{
return capacity_ > size_;});
34 memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
37 hasItem_.notify_one();
41 template <
typename T,
typename... Ts>
43 put(T&& item, Ts&&... items) {
45 unique_lock<mutex> lck(mutex_);
46 hasSlot_.wait(lck, [
this]{
return capacity_ > size_ +
sizeof...(Ts);});
47 fill(std::forward<T>(item), std::forward<Ts>(items)...);
48 size_ +=
sizeof...(items) + 1;
49 if (size_ ==
sizeof...(items) + 1) {
51 hasItem_.notify_one();
55 template <
typename It>
57 tryPutBatch(It begin,
size_t n) {
59 unique_lock<mutex> lck(mutex_);
60 if (hasSlot_.wait_for(
61 lck, chrono::seconds(0), [
this, n]{
return capacity_ > size_ + n - 1;})) {
66 hasItem_.notify_all();
73 template <
typename Item,
typename It>
75 tryPutBatchInPlace(It begin,
size_t n) {
77 unique_lock<mutex> lck(mutex_);
78 if (hasSlot_.wait_for(
79 lck, chrono::seconds(0), [
this, n]{
return capacity_ > size_ + n - 1;})) {
80 fillBatchInPlace<Item>(begin, n);
84 hasItem_.notify_all();
92 template <
typename T>
void putSome(T
const& item) {put(&item);}
93 template <
typename T,
typename ...Args>
94 void putInPlace(Args&&... args) {
96 unique_lock<mutex> lck(mutex_);
97 hasSlot_.wait(lck, [
this](){
return capacity_ > size_;});
98 new (getItem(seq_++)) T(std::forward<Args>(args)...);
101 hasItem_.notify_one();
104 template <
typename T,
typename ...Args>
105 bool tryPutInPlace(Args&&... args) {
107 unique_lock<mutex> lck(mutex_);
108 if (!hasSlot_.wait_for(lck, chrono::seconds(0), [
this](){
return capacity_ > size_;}))
110 new (getItem(seq_++)) T(std::forward<Args>(args)...);
113 hasItem_.notify_one();
118 bool tryPut(
void const* item,
size_t sizeHint = 0
119 , time::Duration timeout = time::Duration::seconds(0)) {
121 unique_lock<mutex> lck(mutex_);
122 if (!hasSlot_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds())
123 , [
this](){
return capacity_ > size_;}))
125 memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
128 hasItem_.notify_one();
133 template <
typename T>
134 bool tryPut(T
const& item
135 , time::Duration timeout = time::Duration::seconds(0)) {
136 return tryPut(&item,
sizeof(item), timeout);
139 bool isFull()
const {
return size_ == capacity_;}
140 void take(
void *dest,
size_t sizeHint = 0) {
142 unique_lock<mutex> lck(mutex_);
143 hasItem_.wait(lck, [
this](){
return size_;});
144 memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
145 if (size_-- == capacity_) {
147 hasSlot_.notify_all();
151 template <
typename T>
153 take(&dest,
sizeof(T));
157 bool tryTake(
void *dest,
size_t sizeHint = 0, time::Duration timeout = time::Duration::seconds(0)) {
159 unique_lock<mutex> lck(mutex_);
160 if (!hasItem_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds()), [
this](){
return size_;}))
return false;
161 memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
162 if (size_-- == capacity_) {
164 hasSlot_.notify_all();
169 template <
typename T>
170 bool tryTake(T& dest) {
171 return tryTake(&dest,
sizeof(T));
174 template <
typename itOut>
175 size_t take(itOut b, itOut e) {
177 unique_lock<mutex> lck(mutex_);
178 hasItem_.wait(lck, [
this](){
return size_;});
180 while (s && b != e) {
181 memcpy(&*b++, getItem(seq_ - s--), min(maxItemSize_,
sizeof(*b)));
183 auto ret = size_ - s;
185 if (size_ + ret == capacity_) {
187 hasSlot_.notify_all();
193 size_t peek(iterator& b, iterator& e);
194 void wasteAfterPeek(
size_t len) {
195 std::unique_lock<std::mutex> lck(mutex_);
196 if (size_ == capacity_) hasSlot_.notify_all();
200 void waitItem(time::Duration timeout) {
202 unique_lock<mutex> lck(mutex_);
203 hasItem_.wait_for(lck, std::chrono::nanoseconds(timeout.nanoseconds())
204 , [
this](){
return size_;});
207 size_t remainingSize()
const {
213 hasSlot_.notify_all();
218 template <
typename T,
typename... Ts>
220 fill(T&& item, Ts&&... items) {
221 new (getItem(seq_++)) T(std::forward<T>(item));
222 fill(std::forward<Ts>(items)...);
225 template <
typename It>
227 fillBatch(It begin,
size_t n) {
228 for (
auto it = begin; n; ++it, --n) {
229 using T = decltype(*begin);
230 new (getItem(seq_++)) T(*it);
234 template <
typename Item,
typename It>
236 fillBatchInPlace(It begin,
size_t n) {
237 for (
auto it = begin; n; ++it, --n) {
238 new (getItem(seq_++)) Item(*it);
248 std::condition_variable hasItem_;
249 std::condition_variable hasSlot_;
252 namespace blocking_buffer_detail {
254 using Sequence = uint64_t;
256 : buf_(buf), seq_(seq){}
257 iterator() : buf_(
nullptr), seq_(0){}
258 void clear() {buf_ =
nullptr;}
260 iterator& operator ++() HMBDC_RESTRICT {
265 iterator operator ++(
int) HMBDC_RESTRICT {
271 iterator operator + (
size_t dis)
const HMBDC_RESTRICT {
277 iterator& operator += (
size_t dis) HMBDC_RESTRICT {
282 size_t operator - (
iterator const& other)
const HMBDC_RESTRICT {
283 return seq_ - other.seq_;
286 explicit operator bool()
const HMBDC_RESTRICT {
290 bool operator < (
iterator const& other)
const HMBDC_RESTRICT {
291 return seq_ < other.seq_;
294 bool operator == (
iterator const& other)
const HMBDC_RESTRICT {
return seq_ == other.seq_;}
295 bool operator != (
iterator const& other)
const HMBDC_RESTRICT {
return seq_ != other.seq_;}
296 void* operator*()
const HMBDC_RESTRICT {
return buf_->getItem(seq_);}
297 template <
typename T> T&
get()
const HMBDC_RESTRICT {
return *
static_cast<T*
>(**this);}
298 template <
typename T>
299 T* operator->() HMBDC_RESTRICT {
return static_cast<T*
>(buf_->getItem(seq_));}
309 peek(iterator& b, iterator& e) {
310 std::unique_lock<std::mutex> lck(mutex_);
311 e = iterator(
this, seq_);
312 b = iterator(
this, seq_ - size_);
317 BlockingBuffer::iterator
320 std::unique_lock<std::mutex> lck(mutex_);
322 return iterator(
this, seq_ - size_);
Definition: BlockingBuffer.hpp:16
Definition: BlockingBuffer.hpp:253