hmbdc
simplify-high-performance-messaging-programming
BlockingBuffer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/Compile.hpp"
4 #include "hmbdc/time/Time.hpp"
5 #include "hmbdc/Config.hpp"
6 #include <mutex> // std::mutex, std::unique_lock
7 #include <condition_variable> // std::condition_variable
8 #include <malloc.h>
9 
10 
11 namespace hmbdc { namespace pattern {
12 
13 namespace blocking_buffer_detail {
14 struct iterator;
15 }
16 
19  using value_type = void *;
20  BlockingBuffer(size_t maxItemSize, size_t capacity)
21  : maxItemSize_(maxItemSize)
22  , capacity_(capacity)
23  , size_(0)
24  , seq_(0) {
25  store_ = (char*)memalign(SMP_CACHE_BYTES, capacity*maxItemSize);
26  }
27 
28  ~BlockingBuffer() {
29  ::free(store_);
30  }
31 
32  BlockingBuffer(BlockingBuffer const&) = delete;
33  BlockingBuffer& operator = (BlockingBuffer const&) = delete;
34 
35  value_type getItem(size_t seq) {
36  return store_ + seq % capacity_ * maxItemSize_;
37  }
38 
39  size_t maxItemSize() const {return maxItemSize_;}
40  size_t capacity() const {return capacity_;}
41  void put(void const* item, size_t sizeHint = 0) {
42  using namespace std;
43  unique_lock<mutex> lck(mutex_);
44  hasSlot_.wait(lck, [this]{return capacity_ > size_;});
45  memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
46  if (++size_ == 1) {
47  lck.unlock();
48  hasItem_.notify_one();
49  }
50  }
51 
52  template <typename T, typename... Ts>
53  void
54  put(T&& item, Ts&&... items) {
55  using namespace std;
56  unique_lock<mutex> lck(mutex_);
57  hasSlot_.wait(lck, [this]{return capacity_ > size_ + sizeof...(Ts);});
58  fill(std::forward<T>(item), std::forward<Ts>(items)...);
59  size_ += sizeof...(items) + 1;
60  if (size_ == sizeof...(items) + 1) {
61  lck.unlock();
62  hasItem_.notify_one();
63  }
64  }
65 
66  template <typename It>
67  bool
68  tryPutBatch(It begin, size_t n) {
69  using namespace std;
70  unique_lock<mutex> lck(mutex_);
71  if (hasSlot_.wait_for(
72  lck, chrono::seconds(0), [this, n]{return capacity_ > size_ + n - 1;})) {
73  fillBatch(begin, n);
74  size_ += n;
75  if (size_ == n) {
76  lck.unlock();
77  hasItem_.notify_all();
78  }
79  return true;
80  }
81  return false;
82  }
83 
84  template <typename Item, typename It>
85  bool
86  tryPutBatchInPlace(It begin, size_t n) {
87  using namespace std;
88  unique_lock<mutex> lck(mutex_);
89  if (hasSlot_.wait_for(
90  lck, chrono::seconds(0), [this, n]{return capacity_ > size_ + n - 1;})) {
91  fillBatchInPlace<Item>(begin, n);
92  size_ += n;
93  if (size_ == n) {
94  lck.unlock();
95  hasItem_.notify_all();
96  }
97  return true;
98  }
99  return false;
100  }
101 
102 
103  template <typename T> void putSome(T const& item) {put(&item);}
104  template <typename T, typename ...Args>
105  void putInPlace(Args&&... args) {
106  using namespace std;
107  unique_lock<mutex> lck(mutex_);
108  hasSlot_.wait(lck, [this](){return capacity_ > size_;});
109  new (getItem(seq_++)) T(std::forward<Args>(args)...);
110  if (++size_ == 1) {
111  lck.unlock();
112  hasItem_.notify_one();
113  }
114  }
115  template <typename T, typename ...Args>
116  bool tryPutInPlace(Args&&... args) {
117  using namespace std;
118  unique_lock<mutex> lck(mutex_);
119  if (!hasSlot_.wait_for(lck, chrono::seconds(0), [this](){return capacity_ > size_;}))
120  return false;
121  new (getItem(seq_++)) T(std::forward<Args>(args)...);
122  if (++size_ == 1) {
123  lck.unlock();
124  hasItem_.notify_one();
125  }
126  return true;
127  }
128 
129  bool tryPut(void const* item, size_t sizeHint = 0
130  , time::Duration timeout = time::Duration::seconds(0)) {
131  using namespace std;
132  unique_lock<mutex> lck(mutex_);
133  if (!hasSlot_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds())
134  , [this](){return capacity_ > size_;}))
135  return false;
136  memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
137  if (++size_ == 1) {
138  lck.unlock();
139  hasItem_.notify_one();
140  }
141  return true;
142  }
143 
144  template <typename T>
145  bool tryPut(T const& item
146  , time::Duration timeout = time::Duration::seconds(0)) {
147  return tryPut(&item, sizeof(item), timeout);
148  }
149 
150  bool isFull() const {return size_ == capacity_;}
151  void take(void *dest, size_t sizeHint = 0) {
152  using namespace std;
153  unique_lock<mutex> lck(mutex_);
154  hasItem_.wait(lck, [this](){return size_;});
155  memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
156  if (size_-- == capacity_) {
157  lck.unlock();
158  hasSlot_.notify_all();
159  }
160  }
161 
162  template <typename T>
163  T& take(T& dest) {
164  take(&dest, sizeof(T));
165  return dest;
166  }
167 
168  bool tryTake(void *dest, size_t sizeHint = 0, time::Duration timeout = time::Duration::seconds(0)) {
169  using namespace std;
170  unique_lock<mutex> lck(mutex_);
171  if (!hasItem_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds()), [this](){return size_;})) return false;
172  memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
173  if (size_-- == capacity_) {
174  lck.unlock();
175  hasSlot_.notify_all();
176  }
177  return true;
178  }
179 
180  template <typename T>
181  bool tryTake(T& dest) {
182  return tryTake(&dest, sizeof(T));
183  }
184 
185  template <typename itOut>
186  size_t take(itOut b, itOut e) {
187  using namespace std;
188  unique_lock<mutex> lck(mutex_);
189  hasItem_.wait(lck, [this](){return size_;});
190  auto s = size_;
191  while (s && b != e) {
192  memcpy(&*b++, getItem(seq_ - s--), min(maxItemSize_, sizeof(*b)));
193  }
194  auto ret = size_ - s;
195  size_ = s;
196  if (size_ + ret == capacity_) {
197  lck.unlock();
198  hasSlot_.notify_all();
199  }
200  return ret;
201  }
202 
203  iterator peek();
204  size_t peek(iterator& b, iterator& e);
205  void wasteAfterPeek(size_t len) {
206  std::unique_lock<std::mutex> lck(mutex_);
207  if (size_ == capacity_) hasSlot_.notify_all();
208  size_ -= len;
209  }
210 
211  void waitItem(time::Duration timeout) {
212  using namespace std;
213  unique_lock<mutex> lck(mutex_);
214  hasItem_.wait_for(lck, std::chrono::nanoseconds(timeout.nanoseconds())
215  , [this](){return size_;});
216  }
217 
218  size_t remainingSize() const {
219  return size_;
220  }
221  void reset() {
222  size_ = 0;
223  seq_ = 0;
224  hasSlot_.notify_all();
225  }
226 
227 private:
228  void fill(){}
229  template <typename T, typename... Ts>
230  void
231  fill(T&& item, Ts&&... items) {
232  new (getItem(seq_++)) T(std::forward<T>(item));
233  fill(std::forward<Ts>(items)...);
234  }
235 
236  template <typename It>
237  void
238  fillBatch(It begin, size_t n) {
239  for (auto it = begin; n; ++it, --n) {
240  using T = decltype(*begin);
241  new (getItem(seq_++)) T(*it);
242  }
243  }
244 
245  template <typename Item, typename It>
246  void
247  fillBatchInPlace(It begin, size_t n) {
248  for (auto it = begin; n; ++it, --n) {
249  new (getItem(seq_++)) Item(*it);
250  }
251  }
252 
253  size_t maxItemSize_;
254  size_t capacity_;
255  char* store_;
256  size_t size_;
257  size_t seq_;
258  std::mutex mutex_;
259  std::condition_variable hasItem_;
260  std::condition_variable hasSlot_;
261 };
262 
263 namespace blocking_buffer_detail {
264 struct iterator {
265  using Sequence = uint64_t;
266  iterator(BlockingBuffer* HMBDC_RESTRICT buf, Sequence seq)
267  : buf_(buf), seq_(seq){}
268  iterator() : buf_(nullptr), seq_(0){}
269  void clear() {buf_ = nullptr;}
270 
271  iterator& operator ++() HMBDC_RESTRICT {
272  ++seq_;
273  return *this;
274  }
275 
276  iterator operator ++(int) HMBDC_RESTRICT {
277  iterator tmp = *this;
278  ++*this;
279  return tmp;
280  }
281 
282  iterator operator + (size_t dis) const HMBDC_RESTRICT {
283  iterator tmp = *this;
284  tmp.seq_ += dis;
285  return tmp;
286  }
287 
288  iterator& operator += (size_t dis) HMBDC_RESTRICT {
289  seq_ += dis;
290  return *this;
291  }
292 
293  size_t operator - (iterator const& other) const HMBDC_RESTRICT {
294  return seq_ - other.seq_;
295  }
296 
297  explicit operator bool() const HMBDC_RESTRICT {
298  return buf_;
299  }
300 
301  bool operator < (iterator const& other) const HMBDC_RESTRICT {
302  return seq_ < other.seq_;
303  }
304 
305  bool operator == (iterator const& other) const HMBDC_RESTRICT {return seq_ == other.seq_;}
306  bool operator != (iterator const& other) const HMBDC_RESTRICT {return seq_ != other.seq_;}
307  void* operator*() const HMBDC_RESTRICT {return buf_->getItem(seq_);}
308  template <typename T> T& get() const HMBDC_RESTRICT {return *static_cast<T*>(**this);}
309  template <typename T>
310  T* operator->() HMBDC_RESTRICT {return static_cast<T*>(buf_->getItem(seq_));}
311 //private:
312  BlockingBuffer* buf_;
313  Sequence seq_;
314 };
315 } //blocking_buffer_detail
316 
317 inline
318 size_t
319 BlockingBuffer::
320 peek(iterator& b, iterator& e) {
321  std::unique_lock<std::mutex> lck(mutex_);
322  e = iterator(this, seq_);
323  b = iterator(this, seq_ - size_);
324  return size_;
325 }
326 
327 inline
328 BlockingBuffer::iterator
329 BlockingBuffer::
330 peek() {
331  std::unique_lock<std::mutex> lck(mutex_);
332  if (size_) {
333  return iterator(this, seq_ - size_);
334  }
335  return iterator();
336 }
337 }}
Definition: BlockingBuffer.hpp:17
Definition: TypedString.hpp:84
Definition: Time.hpp:134
Definition: BlockingBuffer.hpp:264
Definition: Base.hpp:12
Definition: LockFreeBufferMisc.hpp:89