hmbdc
simplify-high-performance-messaging-programming
PoolConsumer.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 #include "hmbdc/pattern/LockFreeBufferMisc.hpp"
5 #include "hmbdc/time/Timers.hpp"
6 #include "hmbdc/Exception.hpp"
7 #include "hmbdc/Config.hpp"
8 
9 #include <stdexcept>
10 
11 namespace hmbdc { namespace pattern {
12 
13 struct PoolConsumer {
14  friend struct PoolImpl;
15 
16  template <typename Buffer>
17  friend struct PoolTImpl;
18  friend struct PoolMinusImpl;
19  explicit PoolConsumer(bool interestedInMessages, time::TimerManager* tmPtr);
20  virtual ~PoolConsumer();
21  void stopped(std::exception const&) noexcept;
22  bool dropped() noexcept;
23  void messageDispatchingStarted(uint16_t threadId) {
24  if (hmbdc_unlikely(!messageDispatchingStarted_)) {
25  messageDispatchingStartedCb(threadId);
26  messageDispatchingStarted_ = true;
27  }
28  }
29  void invoked(size_t);
30 
31 protected:
33 
34 private:
35  void reset();
36  bool handleRange(BufIt begin,
37  BufIt end, uint16_t threadId) noexcept;
38  bool handleInvokeOnly(uint16_t threadId) noexcept;
39  virtual void messageDispatchingStartedCb(uint16_t threadId) {}
40  virtual size_t handleRangeImpl(BufIt begin,
41  BufIt end, uint16_t threadId){ return 0; };
42  virtual void invokedCb(size_t) {}
43  virtual void stoppedCb(std::exception const&) {}
44  virtual bool droppedCb() { return true; }
45 
47  uint64_t poolThreadAffinity;
48  uint16_t droppedCount;
49  uint64_t skippedPoolThreadMask_;
50  HMBDC_SEQ_TYPE nextSeq_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
51  bool interestedInMessages;
52  bool messageDispatchingStarted_;
53 };
54 
55 }} // end namespace hmbdc::pattern
Definition: Timers.hpp:66
Definition: PoolConsumer.hpp:13
Definition: Base.hpp:13
Definition: LockFreeBufferMisc.hpp:89