1 #include "hmbdc/Copyright.hpp" 3 #include "hmbdc/app/Base.hpp" 4 #include "hmbdc/text/Misc.h" 5 #include "hmbdc/numeric/BitMath.hpp" 6 #include "hmbdc/os/Signals.hpp" 12 #include <boost/program_options.hpp> 16 #define __GNUC_PREREQ(x, y) 0 17 char const*
const __progname_full = _cmdname(
nullptr);
19 extern const char *
const __progname_full;
28 namespace hmbdc {
namespace app {
namespace utils {
30 namespace netperf_detail {
47 friend std::ostream& operator << (std::ostream& os,
Message const& r) {
48 os <<
"Message " <<
' ' << r.len <<
' ' << r.seq;
51 char padding[65536 -
sizeof(seq) -
sizeof(len)];
52 } __attribute__((__packed__));
61 friend std::ostream& operator << (std::ostream& os,
MessageAtt const& r) {
62 os <<
"MessageAtt " <<
' ' << r.len <<
' ' << r.seq;
75 schedule(SysTime::now(), *
this);
78 virtual void report() = 0;
89 , periodicGoodMsgCount_(0)
90 , periodStartAt_(SysTime::now()) {
91 sender_ = NetContext::instance().getSender(topic);
92 if (msg_.len > 1000 && allowAtt) {
93 fileMap_.map(__progname_full);
94 if (fileMap_.len < msgSize) {
95 HMBDC_THROW(std::out_of_range,
"exceeds test allowed msg size <=" << fileMap_.len);
97 msgAtt_.attachment = fileMap_.attachment;
98 msgAtt_.len = msgSize;
100 msgAtt_.afterConsumedCleanupFunc =
nullptr;
105 if (msgAtt_.len > 1000 && allowAtt_) fileMap_.unmap();
107 char const* hmbdcName()
const {
114 if (hmbdc_likely(!allowAtt_ || msg_.len <= 1000u)) {
115 sender_->send(msg_, msg_.len);
117 periodicGoodMsgCount_++;
119 sender_->send(msgAtt_);
121 periodicGoodMsgCount_++;
126 std::cout << (e.what()) << std::endl;
129 void report()
override {
130 auto rate = size_t(periodicGoodMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
132 HMBDC_LOG_n(
"msgSize=", l,
" mps=", rate);
133 periodicGoodMsgCount_ = 0u;
134 periodStartAt_ = SysTime::now();
142 size_t periodicGoodMsgCount_;
147 void runInSenderMode(
Config & config,
Topic const& topic,
size_t msgSize
148 , vector<uint16_t> cpus,
bool allowAtt) {
152 auto engine = NetContext::instance().createSendTransportEngine(config
153 , msgSize > 1000 && allowAtt
157 ctx.
start(*engine, 1ul << cpus[0]
158 , client, 1ul << cpus[1]);
169 :
Client<ReceiverClient, Message, MessageAtt>
173 , periodicGoodMsgCount_(0)
174 , periodicGoodMsgBytes_(0)
175 , periodicMsgCount_(0)
176 , periodicMsgBytes_(0)
177 , periodStartAt_(SysTime::now())
183 , msgOverhead_(msgOverhead) {
186 char const* hmbdcName()
const {
191 void handleMessageCb(M
const& r) {
193 if (hmbdc_likely(r.seq > seq_)) {
194 periodicGoodMsgCount_++;
195 periodicGoodMsgBytes_ += msgOverhead_ + msgSize_;
196 if (hmbdc_likely(seq_)) periodicMissed_ += r.seq - seq_ - 1;
197 }
else if (r.seq == seq_) {
204 periodicMsgBytes_ += msgOverhead_ + msgSize_;
208 handleMessageCb<MessageAtt>(r);
213 std::cout << (e.what()) << std::endl;
216 void report()
override {
217 auto rateGood = size_t(periodicGoodMsgCount_ / ((SysTime::now() - periodStartAt_) / Duration::seconds(1)));
218 HMBDC_LOG_n(
"msgSize=", msgSize_,
" msgOverhead=", msgOverhead_,
" mps(good)=", rateGood,
" Mbps(good)=", periodicGoodMsgBytes_*8u >> 20u
219 ,
" missed=" , periodicMissed_ ,
'(' , periodicMissed_?periodicMissed_ * 100ul / (periodicMissed_ + periodicMsgCount_):0
220 ,
"%) outOfOrder=" , periodicOoo_
221 ,
" duplicate=" , periodicDup_
222 ,
" Mbps(all)=" , periodicMsgBytes_*8u >> 20u
225 periodicGoodMsgCount_ = 0u;
226 periodicGoodMsgBytes_ = 0u;
227 periodicMsgCount_ = 0;
228 periodicMsgBytes_ = 0u;
233 periodStartAt_ = SysTime::now();
237 size_t periodicGoodMsgCount_;
238 size_t periodicGoodMsgBytes_;
239 size_t periodicMsgCount_;
240 size_t periodicMsgBytes_;
243 size_t periodicMissed_;
250 template <
typename RecvCtorArgsTuple>
251 void runInReceiverMode(
Config const& config,
Topic const& topic,
size_t msgSize, uint16_t inBuferSizePower2
252 , vector<uint16_t> cpus, RecvCtorArgsTuple&& recvCtorArgs,
bool allowAtt) {
254 using namespace boost;
255 RecvCtx ctx(inBuferSizePower2, 1, msgSize > 1000 && allowAtt?
sizeof(
MessageAtt):msgSize);
258 auto& net = NetContext::instance();
260 auto engine = net.createRecvTransportEngineTuply(config, ctx.
buffer(), std::forward<RecvCtorArgsTuple>(recvCtorArgs));
261 auto msgOverhead = 8u +
sizeof(TransportMessageHeader) + topic.size();
264 ctx.
start(*engine, 1ul << cpus[0]
265 , client, 1ul << cpus[1]);
273 net.stopListenTo(topic);
276 template <
typename RecvCtorArgsTuple>
277 int startNetPerf(
char const* dftUserConfig,
char const* helpStrIn,
int argc,
char** argv
278 , RecvCtorArgsTuple&& recvCtorArgs,
bool allowAtt =
true) {
284 vector<uint16_t> cpus;
285 uint16_t inBuferSizePower2;
287 if (argc > 1) role = string(argv[1]);
288 if (role !=
string(
"--sender")
289 && role !=
string(
"--receiver")) {
290 cerr <<
"1st arg needs to be either --sender or --receiver" << endl;
296 bool isSender = role == string(
"--sender");
297 auto checkMsgSize = [](
size_t s) {
298 if (s < 8) HMBDC_THROW(out_of_range,
"msgSize too small (>=8)");
300 namespace po = boost::program_options;
301 po::options_description desc(
"additional allowed options (after --sender or --receiver)");
303 "This program can be used to test messaging performance among a group of hosts.\n" 304 "It can run as either a sender or a receiver. Normally one sender for a single interface in a test. \n" 305 "When Sender sends out traffic, receivers subscribe to the traffic and collect stats.\n" 306 "ctrl-c terminate.\n";
307 helpStr += string(helpStrIn);
309 (
"help", helpStr.c_str())
310 (
"cfg,c", po::value<std::string>(&userCfg),
"use the user provided config file and ignore the ones in command options")
311 (
"print",
"print effective cfg values")
312 (
"cpus", po::value(&cpus)->multitoken()->default_value({0, 1},
"0 1"),
"specify the 2 cpu index numbers used in the test (1 for engine and 1 for msg client)")
313 (
"msgSize", po::value<size_t>(&msgSize)->default_value(8u)->notifier(checkMsgSize),
"size of the message (8-size of this executable file). when > 1000 the message is sent using zero copy memory attachment if it is supported by the transport type and hmbdc posts no limit on attachment size")
314 (
"inBuferSizePower2", po::value<uint16_t>(&inBuferSizePower2)->default_value(0u),
"2^inBufferSizePower2 is the number of message that can be buffered in the program, default 0 means automatically caulcated based on 32MB as lower bound")
315 (
"topic", po::value<Topic>(&topic)->default_value(topic),
"topic used in the test");
317 Config dft(dftUserConfig, isSender?
"tx":
"rx");
319 for (
auto it = params.begin(); it != params.end();) {
320 string& name = it->first;
321 string& val = it->second;
323 string& comment = it->second;
326 (name.c_str(), po::value<string>(&val)->default_value(val), comment.c_str());
329 po::positional_options_description p;
330 po::variables_map vm;
331 po::store(po::command_line_parser(argc, argv).
332 options(desc).positional(p).run(), vm);
335 if (argc == 1 || vm.count(
"help") || msgSize < 8u || cpus.size() != 2) {
336 cout << desc <<
"\n";
341 if (userCfg.size()) {
342 std::istringstream cs(userCfg);
343 config =
Config(cs, isSender?
"tx":
"rx");
345 for (
auto& p : params) {
346 config.put(p.first, p.second);
349 if (msgSize > 1000 && allowAtt && !isSender) {
350 config.put(
"allowRecvMemoryAttachment",
"1002");
353 if (vm.count(
"print")) {
354 write_json(cout, config);
359 runInSenderMode(config, topic, msgSize, cpus, allowAtt);
361 if (!inBuferSizePower2) {
362 inBuferSizePower2 = hmbdc::numeric::log2Upper(32ul * 1024ul * 1024ul / (8ul + msgSize));
363 cout <<
"auto set --inBuferSizePower2=" << inBuferSizePower2 << endl;
365 runInReceiverMode(config, topic, msgSize, inBuferSizePower2
366 , cpus, forward<RecvCtorArgsTuple>(recvCtorArgs), allowAtt);
372 using netperf_detail::startNetPerf;
class to hold an hmbdc configuration
Definition: Config.hpp:46
void stop()
stop the message dispatching - asynchronously
Definition: Context.hpp:698
topic as in the publish / subscribe communication paradigm
Definition: Topic.hpp:14
void join()
wait until all threads (Pool threads too if apply) of the Context exit
Definition: Context.hpp:707
Definition: BitMath.hpp:6
Definition: NetPerf.hpp:168
Definition: Timers.hpp:66
each message type has 16 bit tag
Definition: Message.hpp:60
RAII representing the lifespan of the underlying Singleton which also ganrantees the singularity of u...
Definition: GuardedSingleton.hpp:20
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: NetPerf.hpp:212
void stoppedCb(std::exception const &e) override
callback called when this Client is taken out of message dispatching
Definition: NetPerf.hpp:125
static void onTermIntDo(std::function< void()> doThis)
specfy what to do when SIGTERM or SIGINT is received
Definition: Signals.hpp:27
Definition: NetPerf.hpp:54
Definition: Message.hpp:301
A Context is like a media object that facilitates the communications for the Clients that it is holdi...
Definition: Context.hpp:477
Definition: NetPerf.hpp:39
Definition: NetPerf.hpp:67
Buffer & buffer()
accessor - mostly used internally
Definition: Context.hpp:341
Definition: NetPerf.hpp:81
A Client represents a thread of execution/a task. The execution is managed by a Context. a Client object could participate in message dispatching as the receiver of specifed message types.
Definition: Client.hpp:57
void start(Args &&... args)
start the context by specifying what are in it (Pool and/or direct Clients) and their paired up cpu a...
Definition: Context.hpp:688
if a specific hmbdc network transport (for example tcpcast, rmcast, and rnetmap) supports message wit...
Definition: Message.hpp:225
void invokedCb(size_t) HMBDC_RESTRICT override
this callback is called all the time (frequently) - the exact timing is after a batch of messages are...
Definition: NetPerf.hpp:112
Definition: Timers.hpp:113
list< pair< string, string > > content(unordered_set< string > const &skipThese=unordered_set< string >()) const
get contents of all the effective configure in the form of list of string pairs
Definition: Config.hpp:373