1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/Compile.hpp" 4 #include "hmbdc/time/Time.hpp" 5 #include "hmbdc/Config.hpp" 7 #include <condition_variable> 11 namespace hmbdc {
namespace pattern {
13 namespace blocking_buffer_detail {
19 using value_type =
void *;
21 : maxItemSize_(maxItemSize)
25 store_ = (
char*)memalign(SMP_CACHE_BYTES, capacity*maxItemSize);
35 value_type getItem(
size_t seq) {
36 return store_ + seq % capacity_ * maxItemSize_;
39 size_t maxItemSize()
const {
return maxItemSize_;}
40 size_t capacity()
const {
return capacity_;}
41 void put(
void const* item,
size_t sizeHint = 0) {
43 unique_lock<mutex> lck(mutex_);
44 hasSlot_.wait(lck, [
this]{
return capacity_ > size_;});
45 memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
48 hasItem_.notify_one();
52 template <
typename T,
typename... Ts>
54 put(T&& item, Ts&&... items) {
56 unique_lock<mutex> lck(mutex_);
57 hasSlot_.wait(lck, [
this]{
return capacity_ > size_ +
sizeof...(Ts);});
58 fill(std::forward<T>(item), std::forward<Ts>(items)...);
59 size_ +=
sizeof...(items) + 1;
60 if (size_ ==
sizeof...(items) + 1) {
62 hasItem_.notify_one();
66 template <
typename It>
68 tryPutBatch(It begin,
size_t n) {
70 unique_lock<mutex> lck(mutex_);
71 if (hasSlot_.wait_for(
72 lck, chrono::seconds(0), [
this, n]{return capacity_ > size_ + n - 1;})) {
77 hasItem_.notify_all();
84 template <
typename Item,
typename It>
86 tryPutBatchInPlace(It begin,
size_t n) {
88 unique_lock<mutex> lck(mutex_);
89 if (hasSlot_.wait_for(
90 lck, chrono::seconds(0), [
this, n]{return capacity_ > size_ + n - 1;})) {
91 fillBatchInPlace<Item>(begin, n);
95 hasItem_.notify_all();
103 template <
typename T>
void putSome(T
const& item) {put(&item);}
104 template <
typename T,
typename ...Args>
105 void putInPlace(Args&&... args) {
107 unique_lock<mutex> lck(mutex_);
108 hasSlot_.wait(lck, [
this](){
return capacity_ > size_;});
109 new (getItem(seq_++)) T(std::forward<Args>(args)...);
112 hasItem_.notify_one();
115 template <
typename T,
typename ...Args>
116 bool tryPutInPlace(Args&&... args) {
118 unique_lock<mutex> lck(mutex_);
119 if (!hasSlot_.wait_for(lck, chrono::seconds(0), [
this](){return capacity_ > size_;}))
121 new (getItem(seq_++)) T(std::forward<Args>(args)...);
124 hasItem_.notify_one();
129 bool tryPut(
void const* item,
size_t sizeHint = 0
132 unique_lock<mutex> lck(mutex_);
133 if (!hasSlot_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds())
134 , [
this](){
return capacity_ > size_;}))
136 memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
139 hasItem_.notify_one();
144 template <
typename T>
145 bool tryPut(T
const& item
147 return tryPut(&item,
sizeof(item), timeout);
150 bool isFull()
const {
return size_ == capacity_;}
151 void take(
void *dest,
size_t sizeHint = 0) {
153 unique_lock<mutex> lck(mutex_);
154 hasItem_.wait(lck, [
this](){
return size_;});
155 memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
156 if (size_-- == capacity_) {
158 hasSlot_.notify_all();
162 template <
typename T>
164 take(&dest,
sizeof(T));
168 bool tryTake(
void *dest,
size_t sizeHint = 0,
time::Duration timeout = time::Duration::seconds(0)) {
170 unique_lock<mutex> lck(mutex_);
171 if (!hasItem_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds()), [
this](){
return size_;}))
return false;
172 memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
173 if (size_-- == capacity_) {
175 hasSlot_.notify_all();
180 template <
typename T>
181 bool tryTake(T& dest) {
182 return tryTake(&dest,
sizeof(T));
185 template <
typename itOut>
186 size_t take(itOut b, itOut e) {
188 unique_lock<mutex> lck(mutex_);
189 hasItem_.wait(lck, [
this](){
return size_;});
191 while (s && b != e) {
192 memcpy(&*b++, getItem(seq_ - s--), min(maxItemSize_,
sizeof(*b)));
194 auto ret = size_ - s;
196 if (size_ + ret == capacity_) {
198 hasSlot_.notify_all();
205 void wasteAfterPeek(
size_t len) {
206 std::unique_lock<std::mutex> lck(mutex_);
207 if (size_ == capacity_) hasSlot_.notify_all();
213 unique_lock<mutex> lck(mutex_);
214 hasItem_.wait_for(lck, std::chrono::nanoseconds(timeout.nanoseconds())
215 , [
this](){
return size_;});
218 size_t remainingSize()
const {
224 hasSlot_.notify_all();
229 template <
typename T,
typename... Ts>
231 fill(T&& item, Ts&&... items) {
232 new (getItem(seq_++)) T(std::forward<T>(item));
233 fill(std::forward<Ts>(items)...);
236 template <
typename It>
238 fillBatch(It begin,
size_t n) {
239 for (
auto it = begin; n; ++it, --n) {
240 using T = decltype(*begin);
241 new (getItem(seq_++)) T(*it);
245 template <
typename Item,
typename It>
247 fillBatchInPlace(It begin,
size_t n) {
248 for (
auto it = begin; n; ++it, --n) {
249 new (getItem(seq_++)) Item(*it);
259 std::condition_variable hasItem_;
260 std::condition_variable hasSlot_;
263 namespace blocking_buffer_detail {
265 using Sequence = uint64_t;
267 : buf_(buf), seq_(seq){}
268 iterator() : buf_(
nullptr), seq_(0){}
269 void clear() {buf_ =
nullptr;}
271 iterator& operator ++() HMBDC_RESTRICT {
276 iterator operator ++(
int) HMBDC_RESTRICT {
282 iterator operator + (
size_t dis)
const HMBDC_RESTRICT {
288 iterator& operator += (
size_t dis) HMBDC_RESTRICT {
293 size_t operator - (
iterator const& other)
const HMBDC_RESTRICT {
294 return seq_ - other.seq_;
297 explicit operator bool()
const HMBDC_RESTRICT {
301 bool operator < (
iterator const& other)
const HMBDC_RESTRICT {
302 return seq_ < other.seq_;
305 bool operator == (
iterator const& other)
const HMBDC_RESTRICT {
return seq_ == other.seq_;}
306 bool operator != (
iterator const& other)
const HMBDC_RESTRICT {
return seq_ != other.seq_;}
307 void* operator*()
const HMBDC_RESTRICT {
return buf_->getItem(seq_);}
308 template <
typename T> T&
get()
const HMBDC_RESTRICT {
return *
static_cast<T*
>(**this);}
309 template <
typename T>
310 T* operator->() HMBDC_RESTRICT {
return static_cast<T*
>(buf_->getItem(seq_));}
321 std::unique_lock<std::mutex> lck(mutex_);
328 BlockingBuffer::iterator
331 std::unique_lock<std::mutex> lck(mutex_);
333 return iterator(
this, seq_ - size_);
Definition: BlockingBuffer.hpp:17
Definition: TypedString.hpp:84
Definition: BlockingBuffer.hpp:264
Definition: LockFreeBufferMisc.hpp:89