hmbdc
simplify-high-performance-messaging-programming
server-cluster.cpp

a partition Context rightlyfully doesn't contain a thread pool and all its Clients are in direct mode. Pool related interfaces are turned off in compile time

//this example is to show how to use hmbdc to write simple scalable high performance client/server programs
//in a server cluster environment
//
//a client sends a single word per request and asks the word gets reversed in the response from one of
//the servers in a cluster.
//requests are load-banlancedly (by first char in the word) handled by a cluster of server hosts,
//the example shows just one server (with multiple worker cores) for demo purpose
//
//to build: (newer compilers might require -faligned-new compile flag)
//g++ server-cluster.cpp -g -O3 -std=c++1y -Wall -Werror -pthread -I path-to-boost -I /opt/hmbdc/include/ /opt/hmbdc/lib/libhmbdc.a /usr/local/lib/libboost_system.a /usr/local/lib/libboost_iostreams.a -lpthread -lrt -o /tmp/server-cluster
//
// In the scenario of 1 client (using 3 cores) and 1 server (using 5 cores), we send every word in
// /usr/share/dict/words 30 times to saturate the system, we get 479828*30=14394840 results back
// peak transaction per sec reachs well above 5M tps (loseless).
// see the screen shots below:
/*
[server]$ /tmp/server-cluster 192.168.0.101/24
-------------------------------------------------------------------------------------
[client]$ cat /usr/share/dict/words| /tmp/server-cluster 192.168.0.101/24 30 #repeat each request 30 times
all requests sent, waiting for results, press ctrl-c to terminate
^C
result count = 14394840
[client]$ wc -l /usr/share/dict/words
479828 /usr/share/dict/words
-------------------------------------------------------------------------------------
[server]$ /tmp/server-cluster 192.168.0.101/24
^C
worker 0 peak tps=1787943
worker 1 peak tps=1824907
worker 2 peak tps=1780517
//
//alternatively, we could use the console to act as the client:
//
//[client]$ for w in `cat /usr/share/dict/words /usr/share/dict/words /usr/share/dict/words`; do echo sendstr 1005 $w; done|./console-udpcast examples/server-cluster-client.json > /tmp/res
*/
#include <hmbdc/app/udpcast/NetContext.hpp> //use udp multicast for communication
// #include "hmbdc/app/rmcast/NetContext.hpp" // or use reliable udp multicast - rmcast, same performance
#include <hmbdc/time/Timers.hpp> //we count transactions in every sec interval
#include <hmbdc/os/Signals.hpp>
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <algorithm>
#include <string>
#include <memory>
#include <unistd.h>
using namespace std;
using namespace boost;
using namespace hmbdc::time;
using namespace hmbdc::app;
using namespace hmbdc::app::udpcast;
// using namespace hmbdc::app::rmcast;
Topic REQUEST_TOPIC("req");
struct Request
: hasTag<1005> {
char word[120];
};
Topic RESULT_TOPIC("res");
struct Result
: hasTag<1004> {
char reversedWord[120];
};
auto CONFIG = R"|(
{
"outBufferSizePower2" : 13,
"request" : {
"topicRegex" : "req",
"udpcastDests" : "232.43.211.234:4321",
"udpcastListenPort" : 4321,
"udpcastListenAddr" : "232.43.211.234"
},
"result" : {
"topicRegex" : "res",
"udpcastDests" : "232.43.211.235:4321",
"udpcastListenPort" : 4321,
"udpcastListenAddr" : "232.43.211.235"
}
}
)|";
using NetCtx = udpcast::NetContext;
using ClientContext = Context<sizeof(Result)>;
struct WordRequester
: Client<WordRequester, Result> {
WordRequester()
: sender_(NetCtx::instance().getSender(REQUEST_TOPIC))
, resultCount_(0) {
}
void sendRequests(size_t dup) {
string line;
while(getline(cin, line)) {
//since word varies in lengths, use sendBytes
for (size_t i = 0; i < dup; ++i) {
sender_->sendBytes(Request::typeTag
, line.c_str()
, min(line.size() + 1, sizeof(Request::word)));
}
}
}
void handleMessageCb(Result const& m) {
resultCount_++;
// cout << m.reversedWord << endl;
}
Sender* sender_;
size_t resultCount_;
};
/**
* @brief run in client mode
*
*/
void runClient(Config const& config, size_t msgDupCount) {
ClientContext ctx; //context to power network transports and other things
SingletonGuardian<NetCtx> g; //RAII for udpcast::NetContext resources
auto& net = NetCtx::instance();
// we send out messages, need a send engine
Config requestConfig(config.get_child("request"), config);
auto sengine = net.createSendTransportEngine(requestConfig
, sizeof(Request)); //maximum size of the message the engine can send out
// we receive messages, need a receive engine
Config resultConfig(config.get_child("result"), config);
auto rengine = net.createRecvTransportEngine(resultConfig
, ctx.buffer()); //received messages go to ctx so WordRequester can see
WordRequester wr;
ctx.start(*sengine, 0x01ul
, *rengine, 0x02ul
, wr, 0x04ul);
net.listenTo(RESULT_TOPIC);
//send requests
wr.sendRequests(msgDupCount);
//requests are sent, wait long enough to receive all the results
//it is stopped by signal SIGTERM or SIGINT
[&ctx] {
ctx.stop();
}
);
//wait until all results processed
cerr << "all requests sent, waiting for results, press ctrl-d, ctrl-c to terminate" << endl;
ctx.join();
cout << "\nresult count = " << wr.resultCount_ << endl;
}
/**
* @brief a worker (thread)
*/
struct WordReverser
: Client<WordReverser, Request>
WordReverser()
: ReoccuringTimer(Duration::seconds(1u)) //count transaction every sec
, sender_(NetCtx::instance().getSender(RESULT_TOPIC))
, periodTps_(0)
, peakTps_(0) {
setCallback( //call this when timer fires
[this](TimerManager& tm, SysTime const& now) {
record();
}
);
schedule(SysTime::now(), *this);
}
//although not obvious in this example, all callbacks of a particular Client
//are garanteed thread-safe (i.e. not called concurrently) by hmbdc which
//simplifies programming
void handleMessageCb(Request const& m) {
char s[sizeof(Result::reversedWord)];
auto l = min(sizeof(s) -1, strnlen(m.word, sizeof(s) -1));
reverse_copy(m.word, m.word + l, s);
s[l] = 0;
sender_->sendBytes(Result::typeTag, s, l + 1);
periodTps_++;
}
void record() { //record the peak transaction per sec
peakTps_ = max(peakTps_, periodTps_);
periodTps_ = 0;
}
Sender* sender_;
size_t periodTps_;
size_t peakTps_;
};
//! [declare a partition context]
// use partition so that a message is dispatched to one and only one client in the context
//! [declare a partition context]
/**
* @brief run server mode
*/
void runServer(Config const& config) {
ServerContext ctx;
SingletonGuardian<NetCtx> g; //RAII for udpcast::NetContext resources
auto& net = NetCtx::instance();
// we send out messages, need a send engine
Config resultConfig(config.get_child("result"), config);
auto sengine = net.createSendTransportEngine(resultConfig
, sizeof(Result)); //maximum size of the message the engine can send out
// we receive messages, need a receive engine
Config requestConfig(config.get_child("request"), config);
//! [define msg arbitrator]
auto rengine = net.createRecvTransportEngine(requestConfig
, ctx.buffer() //put the incoming messsage into server context's buffer
//a receive engine can optionally decide what messages to drop/keep by
//the following
, [](TransportMessageHeader const* h) { //only keep messages in my range
if (h->typeTag() == Request::typeTag) {
auto& m = h->wrapped<Request>();
uint8_t c = (uint8_t) m.word[0];
//basically covers everything ascii here
//to scale the server to multiple hosts, each host could handle a range partition
//so they collectively cover the whole range
if (c >= 0 && c <= 127) {
return 1; //good
}
}
return -1; //ignore
}
);
//! [define msg arbitrator]
//! [start multiple direct Clients]
//this the main context, it hosts the network transport engine threads
//and worker threads
WordReverser wr[3];
ctx.start(
//3 worker threads in this hosts
//the buffer type used in the ctx ensures a message goes to
//one and only one of the worker - note transport engines not participating
wr[0], 0x08ul
, wr[1], 0x10ul
, wr[2], 0x20ul
//normally partition Context only holds homogeneous Clients like above
//however, transport engines do not participating messaging dispatching
//safe to run them in any type of Context
, *sengine, 0x40ul
, *rengine, 0x80ul);
//! [start multiple direct Clients]
//let requests flow in
net.listenTo(REQUEST_TOPIC);
//things r running, we need to provide an exit
//like most server, it is stopped by signal SIGTERM or SIGINT
[&ctx] {
ctx.stop();
}
);
//block until ctx is stopped by signal handler
ctx.join();
cout << endl;
for (size_t i = 0; i < sizeof(wr) / sizeof(wr[0]); i++) {
cout << "worker " << i << " peak tps=" << wr[i].peakTps_ << endl;
}
}
int main(int argc, char** argv) {
istringstream is(CONFIG);
Config config(is);
if (argc == 3) { //run as client
} else if (argc == 2) {
//run as server
}
else {
cerr << argv[0] << " incoming-local-ip [sendBytesPerSec - use 0 here for no rate control]" << endl;
cerr << "multicast should be enabled on local-ip network, server sides requires 6 cores and client side requires 2" << endl;
return -1;
}
//set global parameters for both transports
config.put("ifaceAddr", argv[1]);
if (argc == 3) {
runClient(config, lexical_cast<size_t>(argv[2]));
} else {
runServer(config);
}
return 0;
}