hmbdc
simplify-high-performance-messaging-programming
NetPerf.hpp
1 #include "hmbdc/Copyright.hpp"
2 #pragma once
3 #include "hmbdc/app/Base.hpp"
4 #include "hmbdc/text/Misc.h"
5 #include "hmbdc/numeric/BitMath.hpp"
6 #include "hmbdc/os/Signals.hpp"
7 
8 #include <iostream>
9 #include <vector>
10 #include <memory>
11 #include <unistd.h>
12 #include <boost/program_options.hpp>
13 
14 #ifdef _QNX_SOURCE
15 #include <process.h>
16 #define __GNUC_PREREQ(x, y) 0
17 char const* const __progname_full = _cmdname(nullptr);
18 #else
19 extern const char * const __progname_full;
20 #endif
21 
22 
23 /**
24  * @namespace hmbdc::app::utils
25  * utilities based on app and serve its higher layers
26  */
27 
28 namespace hmbdc { namespace app { namespace utils {
29 
30 namespace netperf_detail {
31 using namespace tested_module;
32 using namespace hmbdc::time;
33 using namespace hmbdc::numeric;
34 using namespace boost::property_tree;
35 using namespace std;
36 
37 using SendCtx = hmbdc::app::Context<24>;
38 
39 struct Message
40 : hasTag<1001> {
41  Message(uint32_t len)
42  : len(len)
43  , seq(0)
44  {}
45  uint32_t len;
46  uint32_t seq;
47  friend std::ostream& operator << (std::ostream& os, Message const& r) {
48  os << "Message " << ' ' << r.len << ' ' << r.seq;
49  return os;
50  }
51  char padding[65536 - sizeof(seq) - sizeof(len)];
52 } __attribute__((__packed__));
53 
54 struct MessageAtt
56 , hasTag<1002> {
57  MessageAtt()
58  : seq(0)
59  {}
60  uint32_t seq;
61  friend std::ostream& operator << (std::ostream& os, MessageAtt const& r) {
62  os << "MessageAtt " << ' ' << r.len << ' ' << r.seq;
63  return os;
64  }
65 };
66 
69  : ReoccuringTimer(Duration::seconds(1u)) {
70  setCallback(
71  [this](TimerManager& tm, SysTime const& now) {
72  report();
73  }
74  );
75  schedule(SysTime::now(), *this);
76  }
77 
78  virtual void report() = 0;
79 };
80 
81 struct SenderClient
82 : Client<SenderClient>
84  SenderClient(SendCtx& ctx, Topic const topic, size_t msgSize, bool allowAtt)
85  : ctx_(ctx)
86  , allowAtt_(allowAtt)
87  , msg_(msgSize)
88  , msgAtt_()
89  , periodicGoodMsgCount_(0)
90  , periodStartAt_(SysTime::now()) {
91  sender_ = NetContext::instance().getSender(topic);
92  if (msg_.len > 1000 && allowAtt) {
93  fileMap_.map(__progname_full);
94  if (fileMap_.len < msgSize) {
95  HMBDC_THROW(std::out_of_range, "exceeds test allowed msg size <=" << fileMap_.len);
96  }
97  msgAtt_.attachment = fileMap_.attachment;
98  msgAtt_.len = msgSize;
99  //don't release memory, since we r reusing the same msfAtt_ everytime
100  msgAtt_.afterConsumedCleanupFunc = nullptr;
101  }
102  }
103 
104  ~SenderClient() {
105  if (msgAtt_.len > 1000 && allowAtt_) fileMap_.unmap();
106  }
107  char const* hmbdcName() const {
108  return "perf-tx";
109  }
110 
111  /*virtual*/
112  void invokedCb(size_t) HMBDC_RESTRICT override {
113  //send as fast as possible since the engine has rate control
114  if (hmbdc_likely(!allowAtt_ || msg_.len <= 1000u)) {
115  sender_->send(msg_, msg_.len);
116  msg_.seq++;
117  periodicGoodMsgCount_++;
118  } else {
119  sender_->send(msgAtt_);
120  msgAtt_.seq++;
121  periodicGoodMsgCount_++;
122  }
123  }
124 
125  void stoppedCb(std::exception const& e) override {
126  std::cout << (e.what()) << std::endl;
127  };
128 
129  void report() override {
130  auto rate = size_t(periodicGoodMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
131  auto l = msg_.len;
132  HMBDC_LOG_n("msgSize=", l, " mps=", rate);
133  periodicGoodMsgCount_ = 0u;
134  periodStartAt_ = SysTime::now();
135  }
136 
137  SendCtx& ctx_;
138  bool allowAtt_;
139  Sender* sender_;
140  Message msg_;
141  MessageAtt msgAtt_;
142  size_t periodicGoodMsgCount_;
143  SysTime periodStartAt_;
144  hasMemoryAttachment fileMap_;
145 };
146 
147 void runInSenderMode(Config & config, Topic const& topic, size_t msgSize
148  , vector<uint16_t> cpus, bool allowAtt) {
149  using namespace std;
150  SendCtx ctx;
152  auto engine = NetContext::instance().createSendTransportEngine(config
153  , msgSize > 1000 && allowAtt
154  ?std::max(sizeof(MessageAtt), sizeof(StartMemorySegTrain))
155  :msgSize);
156  SenderClient client(ctx, topic, msgSize, allowAtt);
157  ctx.start(*engine, 1ul << cpus[0]
158  , client, 1ul << cpus[1]);
160  [&ctx]() {
161  ctx.stop();
162  }
163  );
164  ctx.join();
165 }
166 
169 : Client<ReceiverClient, Message, MessageAtt>
171  ReceiverClient(RecvCtx& ctx, size_t msgOverhead)
172  : ctx_(ctx)
173  , periodicGoodMsgCount_(0)
174  , periodicGoodMsgBytes_(0)
175  , periodicMsgCount_(0)
176  , periodicMsgBytes_(0)
177  , periodStartAt_(SysTime::now())
178  , seq_(0)
179  , periodicMissed_(0)
180  , periodicOoo_(0)
181  , periodicDup_(0)
182  , msgSize_(0)
183  , msgOverhead_(msgOverhead) {
184  }
185 
186  char const* hmbdcName() const {
187  return "perf-rx";
188  }
189 
190  template<typename M>
191  void handleMessageCb(M const& r) {
192  msgSize_ = r.len;
193  if (hmbdc_likely(r.seq > seq_)) {
194  periodicGoodMsgCount_++;
195  periodicGoodMsgBytes_ += msgOverhead_ + msgSize_;
196  if (hmbdc_likely(seq_)) periodicMissed_ += r.seq - seq_ - 1;
197  } else if (r.seq == seq_) {
198  periodicDup_++;
199  } else {
200  periodicOoo_++;
201  }
202  seq_ = r.seq;
203  periodicMsgCount_++;
204  periodicMsgBytes_ += msgOverhead_ + msgSize_;
205  }
206 
207  void handleMessageCb(MessageAtt& r) {
208  handleMessageCb<MessageAtt>(r);
209  r.release();
210  }
211 
212  void stoppedCb(std::exception const& e) override {
213  std::cout << (e.what()) << std::endl;
214  };
215 
216  void report() override {
217  auto rateGood = size_t(periodicGoodMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
218  HMBDC_LOG_n("msgSize=", msgSize_, " msgOverhead=", msgOverhead_, " mps(good)=", rateGood, " Mbps(good)=", periodicGoodMsgBytes_*8u >> 20u
219  , " missed=" , periodicMissed_ , '(' , periodicMissed_?periodicMissed_ * 100ul / (periodicMissed_ + periodicMsgCount_):0
220  , "%) outOfOrder=" , periodicOoo_
221  , " duplicate=" , periodicDup_
222  , " Mbps(all)=" , periodicMsgBytes_*8u >> 20u
223  );
224 
225  periodicGoodMsgCount_ = 0u;
226  periodicGoodMsgBytes_ = 0u;
227  periodicMsgCount_ = 0;
228  periodicMsgBytes_ = 0u;
229  periodicMissed_ = 0;
230  periodicDup_ = 0;
231  periodicOoo_ = 0;
232 
233  periodStartAt_ = SysTime::now();
234  }
235 
236  RecvCtx& ctx_;
237  size_t periodicGoodMsgCount_;
238  size_t periodicGoodMsgBytes_;
239  size_t periodicMsgCount_;
240  size_t periodicMsgBytes_;
241  SysTime periodStartAt_;
242  size_t seq_;
243  size_t periodicMissed_;
244  size_t periodicOoo_;
245  size_t periodicDup_;
246  size_t msgSize_;
247  size_t msgOverhead_;
248 };
249 
250 template <typename RecvCtorArgsTuple>
251 void runInReceiverMode(Config const& config, Topic const& topic, size_t msgSize, uint16_t inBuferSizePower2
252  , vector<uint16_t> cpus, RecvCtorArgsTuple&& recvCtorArgs, bool allowAtt) {
253  using namespace std;
254  using namespace boost;
255  RecvCtx ctx(inBuferSizePower2, 1, msgSize > 1000 && allowAtt?sizeof(MessageAtt):msgSize);
256 
258  auto& net = NetContext::instance();
259 
260  auto engine = net.createRecvTransportEngineTuply(config, ctx.buffer(), std::forward<RecvCtorArgsTuple>(recvCtorArgs));
261  auto msgOverhead = 8u + sizeof(TransportMessageHeader) + topic.size();
262  ReceiverClient client(ctx, msgOverhead);
263  net.listenTo(topic);
264  ctx.start(*engine, 1ul << cpus[0]
265  , client, 1ul << cpus[1]);
267  [&ctx]() {
268  ctx.stop();
269  }
270  );
271  ctx.join();
272 
273  net.stopListenTo(topic);
274 }
275 
276 template <typename RecvCtorArgsTuple>
277 int startNetPerf(char const* dftUserConfig, char const* helpStrIn, int argc, char** argv
278  , RecvCtorArgsTuple&& recvCtorArgs, bool allowAtt = true) {
279  SingletonGuardian<SyncLogger> logGuard(cout);
280  using namespace std;
281  string role;
282  string userCfg;
283  size_t msgSize;
284  vector<uint16_t> cpus;
285  uint16_t inBuferSizePower2;
286  Topic topic("t");
287  if (argc > 1) role = string(argv[1]);
288  if (role != string("--sender")
289  && role != string("--receiver")) {
290  cerr << "1st arg needs to be either --sender or --receiver" << endl;
291  exit(1);
292  }
293  argc--;
294  argv++;
295 
296  bool isSender = role == string("--sender");
297  auto checkMsgSize = [](size_t s) {
298  if (s < 8) HMBDC_THROW(out_of_range, "msgSize too small (>=8)");
299  };
300  namespace po = boost::program_options;
301  po::options_description desc("additional allowed options (after --sender or --receiver)");
302  string helpStr =
303  "This program can be used to test messaging performance among a group of hosts.\n"
304  "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n"
305  "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n"
306  "ctrl-c terminate.\n";
307  helpStr += string(helpStrIn);
308  desc.add_options()
309  ("help", helpStr.c_str())
310  ("cfg,c", po::value<std::string>(&userCfg), "use the user provided config file and ignore the ones in command options")
311  ("print", "print effective cfg values")
312  ("cpus", po::value(&cpus)->multitoken()->default_value({0, 1}, "0 1"), "specify the 2 cpu index numbers used in the test (1 for engine and 1 for msg client)")
313  ("msgSize", po::value<size_t>(&msgSize)->default_value(8u)->notifier(checkMsgSize), "size of the message (8-size of this executable file). when > 1000 the message is sent using zero copy memory attachment if it is supported by the transport type and hmbdc posts no limit on attachment size")
314  ("inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u), "2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
315  ("topic", po::value<Topic>(&topic)->default_value(topic), "topic used in the test");
316 
317  Config dft(dftUserConfig, isSender?"tx":"rx");
318  auto params = dft.content();
319  for (auto it = params.begin(); it != params.end();) {
320  string& name = it->first;
321  string& val = it->second;
322  it++;
323  string& comment = it->second;
324  it++;
325  desc.add_options()
326  (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
327  }
328 
329  po::positional_options_description p;
330  po::variables_map vm;
331  po::store(po::command_line_parser(argc, argv).
332  options(desc).positional(p).run(), vm);
333  po::notify(vm);
334 
335  if (argc == 1 || vm.count("help") || msgSize < 8u || cpus.size() != 2) {
336  cout << desc << "\n";
337  return 0;
338  }
339 
340  Config config;
341  if (userCfg.size()) {
342  std::istringstream cs(userCfg);
343  config = Config(cs, isSender?"tx":"rx");
344  } else {
345  for (auto& p : params) {
346  config.put(p.first, p.second);
347  }
348  }
349  if (msgSize > 1000 && allowAtt && !isSender) {
350  config.put("allowRecvMemoryAttachment", "1002");
351  }
352 
353  if (vm.count("print")) {
354  write_json(cout, config);
355  exit(0);
356  }
357 
358  if (isSender) {
359  runInSenderMode(config, topic, msgSize, cpus, allowAtt);
360  } else {
361  if (!inBuferSizePower2) {
362  inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
363  cout << "auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
364  }
365  runInReceiverMode(config, topic, msgSize, inBuferSizePower2
366  , cpus, forward<RecvCtorArgsTuple>(recvCtorArgs), allowAtt);
367  }
368  return 0;
369 }
370 } //netperf_detail
371 
372 using netperf_detail::startNetPerf;
373 }}}
374 
class to hold an hmbdc configuration
Definition: Config.hpp:46
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:698
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:707
Definition: Topic.hpp:44
Definition: BitMath.hpp:6
Definition: Timers.hpp:66
each message type has 16 bit tag
Definition: Message.hpp:60
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: NetPerf.hpp:212
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: NetPerf.hpp:125
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: Message.hpp:301
Definition: Time.hpp:14
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:477
Definition: Rater.hpp:10
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:341
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
void start(Args &&... args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:688
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:225
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: NetPerf.hpp:112
Definition: Base.hpp:13
Definition: Timers.hpp:113
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:373