hmbdc
simplify-high-performance-messaging-programming
BlockingBuffer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/Compile.hpp"
4 #include "hmbdc/time/Time.hpp"
5 #include "hmbdc/Config.hpp"
6 #include <mutex> // std::mutex, std::unique_lock
7 #include <condition_variable> // std::condition_variable
8 
9 
10 namespace hmbdc { namespace pattern {
11 
12 namespace blocking_buffer_detail {
13 struct iterator;
14 }
15 
18  using value_type = void *;
19  BlockingBuffer(size_t, size_t);
20  ~BlockingBuffer();
21  BlockingBuffer(BlockingBuffer const&) = delete;
22  BlockingBuffer& operator = (BlockingBuffer const&) = delete;
23 
24  value_type getItem(size_t seq) {
25  return store_ + seq % capacity_ * maxItemSize_;
26  }
27 
28  size_t maxItemSize() const {return maxItemSize_;}
29  size_t capacity() const {return capacity_;}
30  void put(void const* item, size_t sizeHint = 0) {
31  using namespace std;
32  unique_lock<mutex> lck(mutex_);
33  hasSlot_.wait(lck, [this]{return capacity_ > size_;});
34  memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
35  if (++size_ == 1) {
36  lck.unlock();
37  hasItem_.notify_one();
38  }
39  }
40 
41  template <typename T, typename... Ts>
42  void
43  put(T&& item, Ts&&... items) {
44  using namespace std;
45  unique_lock<mutex> lck(mutex_);
46  hasSlot_.wait(lck, [this]{return capacity_ > size_ + sizeof...(Ts);});
47  fill(std::forward<T>(item), std::forward<Ts>(items)...);
48  size_ += sizeof...(items) + 1;
49  if (size_ == sizeof...(items) + 1) {
50  lck.unlock();
51  hasItem_.notify_one();
52  }
53  }
54 
55  template <typename It>
56  bool
57  tryPutBatch(It begin, size_t n) {
58  using namespace std;
59  unique_lock<mutex> lck(mutex_);
60  if (hasSlot_.wait_for(
61  lck, chrono::seconds(0), [this, n]{return capacity_ > size_ + n - 1;})) {
62  fillBatch(begin, n);
63  size_ += n;
64  if (size_ == n) {
65  lck.unlock();
66  hasItem_.notify_all();
67  }
68  return true;
69  }
70  return false;
71  }
72 
73  template <typename Item, typename It>
74  bool
75  tryPutBatchInPlace(It begin, size_t n) {
76  using namespace std;
77  unique_lock<mutex> lck(mutex_);
78  if (hasSlot_.wait_for(
79  lck, chrono::seconds(0), [this, n]{return capacity_ > size_ + n - 1;})) {
80  fillBatchInPlace<Item>(begin, n);
81  size_ += n;
82  if (size_ == n) {
83  lck.unlock();
84  hasItem_.notify_all();
85  }
86  return true;
87  }
88  return false;
89  }
90 
91 
92  template <typename T> void putSome(T const& item) {put(&item);}
93  template <typename T, typename ...Args>
94  void putInPlace(Args&&... args) {
95  using namespace std;
96  unique_lock<mutex> lck(mutex_);
97  hasSlot_.wait(lck, [this](){return capacity_ > size_;});
98  new (getItem(seq_++)) T(std::forward<Args>(args)...);
99  if (++size_ == 1) {
100  lck.unlock();
101  hasItem_.notify_one();
102  }
103  }
104  template <typename T, typename ...Args>
105  bool tryPutInPlace(Args&&... args) {
106  using namespace std;
107  unique_lock<mutex> lck(mutex_);
108  if (!hasSlot_.wait_for(lck, chrono::seconds(0), [this](){return capacity_ > size_;}))
109  return false;
110  new (getItem(seq_++)) T(std::forward<Args>(args)...);
111  if (++size_ == 1) {
112  lck.unlock();
113  hasItem_.notify_one();
114  }
115  return true;
116  }
117 
118  bool tryPut(void const* item, size_t sizeHint = 0
119  , time::Duration timeout = time::Duration::seconds(0)) {
120  using namespace std;
121  unique_lock<mutex> lck(mutex_);
122  if (!hasSlot_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds())
123  , [this](){return capacity_ > size_;}))
124  return false;
125  memcpy(getItem(seq_++), item, sizeHint?sizeHint:maxItemSize_);
126  if (++size_ == 1) {
127  lck.unlock();
128  hasItem_.notify_one();
129  }
130  return true;
131  }
132 
133  template <typename T>
134  bool tryPut(T const& item
135  , time::Duration timeout = time::Duration::seconds(0)) {
136  return tryPut(&item, sizeof(item), timeout);
137  }
138 
139  bool isFull() const {return size_ == capacity_;}
140  void take(void *dest, size_t sizeHint = 0) {
141  using namespace std;
142  unique_lock<mutex> lck(mutex_);
143  hasItem_.wait(lck, [this](){return size_;});
144  memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
145  if (size_-- == capacity_) {
146  lck.unlock();
147  hasSlot_.notify_all();
148  }
149  }
150 
151  template <typename T>
152  T& take(T& dest) {
153  take(&dest, sizeof(T));
154  return dest;
155  }
156 
157  bool tryTake(void *dest, size_t sizeHint = 0, time::Duration timeout = time::Duration::seconds(0)) {
158  using namespace std;
159  unique_lock<mutex> lck(mutex_);
160  if (!hasItem_.wait_for(lck, chrono::nanoseconds(timeout.nanoseconds()), [this](){return size_;})) return false;
161  memcpy(dest, getItem(seq_ - size_), sizeHint?sizeHint:maxItemSize_);
162  if (size_-- == capacity_) {
163  lck.unlock();
164  hasSlot_.notify_all();
165  }
166  return true;
167  }
168 
169  template <typename T>
170  bool tryTake(T& dest) {
171  return tryTake(&dest, sizeof(T));
172  }
173 
174  template <typename itOut>
175  size_t take(itOut b, itOut e) {
176  using namespace std;
177  unique_lock<mutex> lck(mutex_);
178  hasItem_.wait(lck, [this](){return size_;});
179  auto s = size_;
180  while (s && b != e) {
181  memcpy(&*b++, getItem(seq_ - s--), min(maxItemSize_, sizeof(*b)));
182  }
183  auto ret = size_ - s;
184  size_ = s;
185  if (size_ + ret == capacity_) {
186  lck.unlock();
187  hasSlot_.notify_all();
188  }
189  return ret;
190  }
191 
192  iterator peek();
193  size_t peek(iterator& b, iterator& e);
194  void wasteAfterPeek(size_t len) {
195  std::unique_lock<std::mutex> lck(mutex_);
196  if (size_ == capacity_) hasSlot_.notify_all();
197  size_ -= len;
198  }
199 
200  void waitItem(time::Duration timeout) {
201  using namespace std;
202  unique_lock<mutex> lck(mutex_);
203  hasItem_.wait_for(lck, std::chrono::nanoseconds(timeout.nanoseconds())
204  , [this](){return size_;});
205  }
206 
207  size_t remainingSize() const {
208  return size_;
209  }
210  void reset() {
211  size_ = 0;
212  seq_ = 0;
213  hasSlot_.notify_all();
214  }
215 
216 private:
217  void fill(){}
218  template <typename T, typename... Ts>
219  void
220  fill(T&& item, Ts&&... items) {
221  new (getItem(seq_++)) T(std::forward<T>(item));
222  fill(std::forward<Ts>(items)...);
223  }
224 
225  template <typename It>
226  void
227  fillBatch(It begin, size_t n) {
228  for (auto it = begin; n; ++it, --n) {
229  using T = decltype(*begin);
230  new (getItem(seq_++)) T(*it);
231  }
232  }
233 
234  template <typename Item, typename It>
235  void
236  fillBatchInPlace(It begin, size_t n) {
237  for (auto it = begin; n; ++it, --n) {
238  new (getItem(seq_++)) Item(*it);
239  }
240  }
241 
242  size_t maxItemSize_;
243  size_t capacity_;
244  char* store_;
245  size_t size_;
246  size_t seq_;
247  std::mutex mutex_;
248  std::condition_variable hasItem_;
249  std::condition_variable hasSlot_;
250 };
251 
252 namespace blocking_buffer_detail {
253 struct iterator {
254  using Sequence = uint64_t;
255  iterator(BlockingBuffer* HMBDC_RESTRICT buf, Sequence seq)
256  : buf_(buf), seq_(seq){}
257  iterator() : buf_(nullptr), seq_(0){}
258  void clear() {buf_ = nullptr;}
259 
260  iterator& operator ++() HMBDC_RESTRICT {
261  ++seq_;
262  return *this;
263  }
264 
265  iterator operator ++(int) HMBDC_RESTRICT {
266  iterator tmp = *this;
267  ++*this;
268  return tmp;
269  }
270 
271  iterator operator + (size_t dis) const HMBDC_RESTRICT {
272  iterator tmp = *this;
273  tmp.seq_ += dis;
274  return tmp;
275  }
276 
277  iterator& operator += (size_t dis) HMBDC_RESTRICT {
278  seq_ += dis;
279  return *this;
280  }
281 
282  size_t operator - (iterator const& other) const HMBDC_RESTRICT {
283  return seq_ - other.seq_;
284  }
285 
286  explicit operator bool() const HMBDC_RESTRICT {
287  return buf_;
288  }
289 
290  bool operator < (iterator const& other) const HMBDC_RESTRICT {
291  return seq_ < other.seq_;
292  }
293 
294  bool operator == (iterator const& other) const HMBDC_RESTRICT {return seq_ == other.seq_;}
295  bool operator != (iterator const& other) const HMBDC_RESTRICT {return seq_ != other.seq_;}
296  void* operator*() const HMBDC_RESTRICT {return buf_->getItem(seq_);}
297  template <typename T> T& get() const HMBDC_RESTRICT {return *static_cast<T*>(**this);}
298  template <typename T>
299  T* operator->() HMBDC_RESTRICT {return static_cast<T*>(buf_->getItem(seq_));}
300 //private:
301  BlockingBuffer* buf_;
302  Sequence seq_;
303 };
304 } //blocking_buffer_detail
305 
306 inline
307 size_t
308 BlockingBuffer::
309 peek(iterator& b, iterator& e) {
310  std::unique_lock<std::mutex> lck(mutex_);
311  e = iterator(this, seq_);
312  b = iterator(this, seq_ - size_);
313  return size_;
314 }
315 
316 inline
318 BlockingBuffer::
319 peek() {
320  std::unique_lock<std::mutex> lck(mutex_);
321  if (size_) {
322  return iterator(this, seq_ - size_);
323  }
324  return iterator();
325 }
326 }}
Definition: BlockingBuffer.hpp:16
Definition: Topic.hpp:44
Definition: Time.hpp:133
Definition: BlockingBuffer.hpp:253
Definition: Base.hpp:13