hmbdc
simplify-high-performance-messaging-programming
 All Classes Namespaces Functions Variables Friends Pages
SeqArb.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 
4 
5 #include "hmbdc/Exception.hpp"
6 #include "hmbdc/Compile.hpp"
7 #include "hmbdc/Config.hpp"
8 #include <stdexcept>
9 #include <utility>
10 #include <inttypes.h>
11 #include <limits>
12 
13 
14 namespace hmbdc { namespace pattern {
15 
16 namespace seqarb_detail {
17 using namespace std;
18 namespace {
19 
20 template <bool THREADSAFE>
21 inline __attribute__ ((always_inline))
22 void
23 my__sync_synchronize() {
24  if (THREADSAFE) __sync_synchronize();
25 }
26 
27 template <bool THREADSAFE, typename T>
28 inline __attribute__ ((always_inline))
29 bool
30 my__sync_bool_compare_and_swap(volatile T* var, T compVal, T newVal) {
31  if (THREADSAFE) return __sync_bool_compare_and_swap(var, compVal, newVal);
32  if (*var == compVal) {
33  *var = newVal;
34  return true;
35  }
36  return false;
37 }
38 
39 template <bool THREADSAFE, typename T>
40 inline __attribute__ ((always_inline))
41 T
42 my__sync_val_compare_and_swap(volatile T* var, T compVal, T newVal) {
43  if (THREADSAFE) return __sync_val_compare_and_swap(var, compVal, newVal);
44  auto res = *var;
45  if (*var == compVal) {
46  *var = newVal;
47  }
48  return res;
49 }
50 }
51 
52 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t, bool THREADSAFE = true>
53 struct SeqArb {
54  explicit SeqArb(Seq startSeq = 0)
55  : missedInit_(~((1ul << PARTICIPANT_COUNT) - 1u))
56  , gapFuncLock_(true)
57  , seq_(startSeq) {
58  static_assert(PARTICIPANT_COUNT <= 64u, "too many participants");
59  }
60 
61  SeqArb(SeqArb const&) = delete;
62  SeqArb& operator = (SeqArb const&) = delete;
63 
64  template <typename HitFunc, typename GapFunc>
65  inline __attribute__ ((always_inline))
66  bool operator()(uint16_t participantIndex, Seq seq, HitFunc&& h, GapFunc&& g) HMBDC_RESTRICT {
67  j_[participantIndex].seq = seq;
68 
69  auto old = my__sync_val_compare_and_swap<THREADSAFE>(&seq_, seq, std::numeric_limits<Seq>::max());
70 
71  if (old == seq) {
72  h();
73  my__sync_synchronize<THREADSAFE>();
74  seq_ = seq + 1;
75  return true; //arbitrated (utilized or discarded)
76  } else if (old == std::numeric_limits<Seq>::max()) {
77  return false ;
78  } else if (seq < old) {
79  return true; //arbitrated (discard)
80  } else { //seq > old
81  auto low = jLow();
82  if (hmbdc_unlikely((low > old && seq_ == old &&
83  my__sync_bool_compare_and_swap<THREADSAFE>(&seq_, old, std::numeric_limits<Seq>::max())))) {
84  g(low - old);
85  my__sync_synchronize<THREADSAFE>();
86  seq_ = low;
87  }
88  return false; //cannot decide and ask me later
89  }
90  }
91 
92  template <typename SeqGen, typename HitFunc, typename GapFunc>
93  inline __attribute__ ((always_inline))
94  size_t operator()(uint16_t participantIndex, SeqGen&& seqGen
95  , size_t seqSize, HitFunc&& h, GapFunc&& g) HMBDC_RESTRICT {
96  auto seq = seqGen();
97  j_[participantIndex].seq = seq;
98 
99  auto old = my__sync_val_compare_and_swap<THREADSAFE>(
100  &seq_, seq, std::numeric_limits<Seq>::max());
101 
102  if (old == seq) {
103  h();
104  auto preSeq = seq;
105  auto s = 1ul;
106  for (; s < seqSize; ++s) {
107  seq = seqGen();
108  if (seq - 1 == preSeq) {
109  preSeq = seq;
110  h();
111  } else {
112  break;
113  }
114  }
115  j_[participantIndex].seq = seq;
116  my__sync_synchronize<THREADSAFE>();
117  seq_ = seq + s;
118  return s;
119  } else if (old == std::numeric_limits<Seq>::max()) {
120  return 0;
121  } else if (seq < old) {
122  return 1ul; //arbitrated (discard)
123  } else { //seq > old
124  auto low = jLow();
125  if (hmbdc_unlikely((low > old && seq_ == old &&
126  my__sync_bool_compare_and_swap<THREADSAFE>(
127  &seq_, old, std::numeric_limits<Seq>::max())))) {
128  g(low - old);
129  my__sync_synchronize<THREADSAFE>();
130  seq_ = low;
131  }
132  return 0; //cannot decide and ask me later
133  }
134  }
135 
136  volatile Seq const& expectingSeq() const {
137  return seq_;
138  }
139 
140  volatile Seq& expectingSeq() {
141  return seq_;
142  }
143 
144 private:
145  inline __attribute__ ((always_inline))
146  Seq jLow() const HMBDC_RESTRICT {
147  auto res = j_[0].seq;
148  for (auto i = 1u; i < PARTICIPANT_COUNT; ++i)
149  if (res > j_[i].seq) res = j_[i].seq;
150  return res;
151  }
152 
153  uint64_t const missedInit_;
154  bool gapFuncLock_;
155  volatile Seq seq_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
156  uint64_t missed_ __attribute__((__aligned__(SMP_CACHE_BYTES)));
157  struct J {
158  J() : seq(0u){}
159  Seq seq;
160  } __attribute__((__aligned__(SMP_CACHE_BYTES)));
161  J j_[PARTICIPANT_COUNT];
162 };
163 
164 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t>
166  SingleThreadSeqArb(Seq startSeq = 0)
167  : arb_(startSeq){}
168 
169  template <typename GapFunc>
170  int operator()(uint16_t participantIndex, Seq seq, GapFunc&& gapFunc) {
171  int res = -1;
172  if (!arb_(participantIndex, seq
173  , [&res]() {
174  res = 1;
175  }
176  , forward<GapFunc>(gapFunc)
177  )
178  ) {
179  res = 0;
180  }
181  return res;
182  }
183 
184  volatile Seq& expectingSeq() {
185  return arb_.expectingSeq();
186  }
187 
188  volatile Seq const& expectingSeq() const {
189  return arb_.expectingSeq();
190  }
191 
192 private:
194 };
195 
196 } //seqarb_detail
197 
198 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t, bool THREADSAFE = true>
200 
201 template <uint16_t PARTICIPANT_COUNT, typename Seq = uint64_t>
203 
204 }}
Definition: SeqArb.hpp:53