hmbdc
simplify-high-performance-messaging-programming
EpollTask.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/pattern/GuardedSingleton.hpp"
4 #include "hmbdc/Config.hpp"
5 #include "hmbdc/Compile.hpp"
6 #include <memory>
7 #include <algorithm>
8 #include <atomic>
9 #include <mutex>
10 #include <unistd.h>
11 
12 #ifdef HMBDC_NO_EPOLL
13 #include <poll.h>
14 #else
15 #include <sys/epoll.h>
16 #endif
17 
18 namespace hmbdc { namespace app { namespace utils {
19 
20 struct EpollFd;
21 #ifndef HMBDC_NO_EPOLL
22 struct EpollTask
23 : pattern::GuardedSingleton<EpollTask> {
24  enum {
25  EPOLLIN = ::EPOLLIN,
26  EPOLLOUT = ::EPOLLOUT,
27  EPOLLET = ::EPOLLET
28  };
29 
30  static void initialize() {
32  }
33 
34  void setWaitTime(int t) {
35  timeoutMillisec_ = t;
36  }
37 
38  void add(uint32_t events, EpollFd&);
39  bool del(EpollFd&);
40  void poll();
41 
42 private:
44  friend EpollFd;
45  EpollTask(size_t maxFdCount = HMBDC_EPOLL_FD_MAX);
46  ~EpollTask();
47  void setPollPending() { pollPending_ = true; }
48 
49  bool stopped_;
50  int epollFd_;
51  std::atomic<bool> pollPending_ = false;
52  // std::thread pollThread_;
53  int timeoutMillisec_ = 0;
54 };
55 #else
56 struct EpollTask
57 : pattern::GuardedSingleton<EpollTask> {
58  enum {
59  EPOLLIN = POLLIN,
60  EPOLLOUT = POLLOUT,
61  EPOLLET = 0
62  };
63  static void initialize() {
65  }
66 
67  void add(uint32_t events, EpollFd&);
68  bool del(EpollFd&);
69  void poll();
70 
71 private:
73  friend EpollFd;
74  void setPollPending() {}
75  EpollTask(size_t maxFdCount = HMBDC_EPOLL_FD_MAX);
76 
77  ~EpollTask();
78 
79  bool stopped_;
80  size_t maxFdCount_;
81  ::pollfd* pollFds_;
82  bool** sentinels_;
83  std::mutex lock_;
84 };
85 #endif
86 
87 struct EpollFd {
88  EpollFd(EpollFd const&) = delete;
89  EpollFd& operator = (EpollFd const&) = delete;
90  EpollFd()
91  : fd(-1)
92  , fdReady_(false)
93  , fdReadyLocal_(false)
94  {}
95 
96  virtual
97  ~EpollFd() {
98  if (fd > 0) {
99  utils::EpollTask::instance().del(*this);
100  close(fd);
101  }
102  }
103 
104  bool isFdReady() {
105  if (hmbdc_likely(fdReadyLocal_)) {
106  return true;
107  } else {
108  EpollTask::instance().setPollPending();
109  // return fdReadyLocal_ = __sync_val_compare_and_swap(&fdReady_, true, false);
110  return fdReadyLocal_ = __atomic_exchange_n(&fdReady_, false, __ATOMIC_RELAXED);
111  }
112  }
113 
114  bool checkErr() {
115  fdReadyLocal_ = false;
116  if (errno != EAGAIN) {
117  if (!utils::EpollTask::instance().del(*this)) {
118  } else {
119  close(fd);
120  fd = -1;
121  }
122  return false;
123  }
124  return true;
125  }
126 
127  int fd;
128 
129 private:
130  friend struct EpollTask;
131  bool fdReady_;
132  bool fdReadyLocal_;
133 };
134 }}}
135 
136 #ifndef HMBDC_NO_EPOLL
137 #include <sys/epoll.h>
138 namespace hmbdc { namespace app { namespace utils {
139 
140 inline
141 EpollTask::
142 EpollTask(size_t maxFdCount)
143 : stopped_(false) {
144  epollFd_ = epoll_create(maxFdCount);
145  if (epollFd_ == -1) {
146  HMBDC_THROW(std::runtime_error, "cannot create epoll errno=" << errno);
147  }
148 }
149 
150 inline
151 void
152 EpollTask::
153 add(uint32_t events, EpollFd& t) {
154  struct epoll_event ev;
155  ev.events = events;
156  ev.data.ptr = &t.fdReady_;
157  if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, t.fd, &ev) == -1) {
158  HMBDC_THROW(std::runtime_error, "cannot add into epoll, errno=" << errno);
159  }
160 }
161 
162 inline
163 bool
164 EpollTask::
165 del(EpollFd& t) {
166  return epoll_ctl(epollFd_, EPOLL_CTL_DEL, t.fd, NULL) != -1;
167 }
168 
169 inline
170 void
171 EpollTask::
172 poll() {
173  if (!pollPending_) return;
174  struct epoll_event events[HMBDC_EPOLL_FD_MAX];
175  int nfds;
176  nfds = epoll_wait(epollFd_, events, HMBDC_EPOLL_FD_MAX, timeoutMillisec_);
177  if (hmbdc_unlikely(nfds == -1 && errno != EINTR)) {
178  HMBDC_THROW(std::runtime_error, "epoll_pwait errno=" << errno);
179  }
180  for (int i = 0; i < nfds; ++i) {
181  auto& sentinel = *static_cast<bool*>(events[i].data.ptr);
182  sentinel = true;
183  }
184  pollPending_ = false;
185  if (nfds > 0) __atomic_thread_fence(__ATOMIC_RELEASE);
186 }
187 
188 inline
189 EpollTask::
190 ~EpollTask() {
191  close(epollFd_);
192  stopped_ = true;
193  // pollThread_.join();
194 }
195 
196 }}}
197 
198 #else
199 
200 namespace hmbdc { namespace app { namespace utils {
201 
202 inline
203 EpollTask::
204 EpollTask(size_t maxFdCount)
205 : stopped_(false)
206 , maxFdCount_(maxFdCount)
207 , pollFds_(new ::pollfd[maxFdCount])
208 , sentinels_(new bool* [maxFdCount]) {
209  for (auto i = 0u; i < maxFdCount; ++i) {
210  pollFds_[i].fd = -1;
211  sentinels_[i] = nullptr;
212  }
213 }
214 
215 inline
216 void
217 EpollTask::
218 add(uint32_t events, EpollFd& t) {
219  std::lock_guard<std::mutex> g(lock_);
220  auto slot = std::find_if(pollFds_, pollFds_ + maxFdCount_
221  , [](pollfd& pf){return pf.fd < 0;});
222  if (slot != pollFds_ + maxFdCount_) {
223  slot->fd = t.fd;
224  slot->events = (short)events;
225  slot->revents = 0;
226  sentinels_[slot - pollFds_] = &t.fdReady_;
227  } else {
228  HMBDC_THROW(std::runtime_error, "cannot add into poll, errno=" << errno);
229  }
230 }
231 
232 inline
233 bool
234 EpollTask::
235 del(EpollFd& t) {
236  std::lock_guard<std::mutex> g(lock_);
237  auto slot = std::find_if(pollFds_, pollFds_ + maxFdCount_
238  , [fd = t.fd](pollfd& pf){return pf.fd == fd;});
239  if (slot != pollFds_ + maxFdCount_) {
240  slot->fd = -slot->fd;
241  sentinels_[slot - pollFds_] = nullptr;
242  return true;
243  } else {
244  return false;
245  }
246 }
247 
248 inline
249 void
250 EpollTask::
251 poll() {
252  if (!lock_.try_lock()) return;
253  auto nfds = ::poll(pollFds_, (nfds_t)maxFdCount_, 0);
254  for (auto i = 0u; nfds > 0 && i < maxFdCount_; ++i) {
255  if (pollFds_[i].revents) {
256  *sentinels_[i] = true;
257  nfds--;
258  }
259  }
260  lock_.unlock();
261 
262  if (hmbdc_unlikely(nfds< 0 && errno != EINTR)) {
263  HMBDC_THROW(std::runtime_error, "poll errno=" << errno);
264  }
265 }
266 
267 inline
268 EpollTask::
269 ~EpollTask() {
270  stopped_ = true;
271  delete []sentinels_;
272  delete []pollFds_;
273 }
274 
275 }}}
276 
277 #endif
base for the Singleton that works with SingletonGuardian
Definition: GuardedSingleton.hpp:53
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
Definition: EpollTask.hpp:22
Definition: EpollTask.hpp:87
Definition: Base.hpp:12