#include <hmbdc/app/Base.hpp>
#include <iostream>
#include <memory>
#include <future>
#include <unistd.h>
struct Request
Request(uint16_t srcId, uint64_t id)
: srcId(srcId)
, id(id)
{}
uint16_t srcId;
uint64_t id;
};
struct Response
Response(Request const& r)
: srcId(r.srcId)
, respToId(r.id)
{}
uint16_t srcId;
uint64_t respToId;
};
struct Finish
Finish(uint16_t srcId)
: srcId(srcId)
{}
uint16_t srcId;
};
struct MyClient
:
Client<MyClient, Request, Response> {
MyClient(MyContext* myCtx, uint64_t workLoad, uint64_t clientCount, uint16_t srcId)
: myCtx(myCtx)
, srcId(srcId)
, requestId(0u)
, workLoad(workLoad)
, clientCount(clientCount)
, respCount(0u){
}
void messageDispatchingStartedCb(uint16_t) override {
if (workLoad) {
myCtx->send(Request(srcId, requestId));
}
}
void handleMessageCb(Request const& r) {
myCtx->template sendInPlace<Response>(r);
}
void handleMessageCb(Response const& resp) {
if (resp.srcId == srcId &&
resp.respToId == requestId) {
if (++respCount == clientCount) {
workLoad--;
respCount = 0u;
++requestId;
if (workLoad) {
myCtx->send(Request(srcId, requestId));
} else {
Finish fin(srcId);
myCtx->send(fin);
}
}
}
}
void stoppedCb(std::exception const& e) override {
std::cout << e.what() << std::endl;
};
MyContext* myCtx;
uint16_t srcId;
uint64_t requestId;
uint64_t workLoad;
uint64_t clientCount;
uint64_t respCount;
};
struct Monitor
:
Client<Monitor, Request, Response, Finish> {
Monitor(uint16_t count)
: count(count)
, totalRequest(0u)
, totalResponse(0u)
{}
char const* hmbdcName() const { return "monitor"; }
void handleMessageCb(Finish const& fin) {
count--;
if (!count) {
allDone.set_value(true);
}
}
void handleMessageCb(Request const&) {
totalRequest++;
}
void handleMessageCb(Response const&) {
totalResponse++;
}
void report() const {
std::cout << "totalRequest=" << totalRequest << std::endl;
std::cout << "totalResponse=" << totalResponse << std::endl;
}
uint16_t count;
size_t totalRequest;
size_t totalResponse;
std::promise<bool> allDone;
};
int main() {
const uint16_t clientCount = 40u;
const uint64_t workLoad = 1000u;
MyContext ctx(20u, clientCount);
std::unique_ptr<MyClient> clients[clientCount];
Monitor mon(clientCount);
for (uint16_t i = 0; i < clientCount; ++i) {
clients[i].reset(new MyClient(&ctx, workLoad, clientCount, i));
ctx.addToPool(*clients[i]);
}
ctx.start(3, 0x07ul
, mon, 0ul);
mon.allDone.get_future().get();
ctx.stop();
ctx.join();
mon.report();
}