hmbdc
simplify-high-performance-messaging-programming
 All Classes Namespaces Functions Variables Friends Pages
rmcast-cp.cpp
//this is to show how to use hmbdc-rmcast reliable multicast to cp a file to different hosts by
//customizing the attachment writer that passed into a rmcast recv engine
//the reason this might be interesting compared to using scp is it is scalable with the number of dest hosts
//
//to run:
//
//[source-host ~]$ ./rmcast-cp <iface-addr> <source-dir> <file-name> <recipient-count>
//[dest-host ~]$ ./rmcast-cp <iface-addr> <dest-dir> <file-name>
//for example, the following copy the executable itself to 2 recipient hosts' tmp and home directory repsectively
//
//[source-host ~]$ ./rmcast-cp 192.168.0.0/24 ./ rmcast-cp 2
//[dest-host1 ~]$ ./rmcast-cp 192.168.0.0/24 /tmp rmcast-cp
//[dest-host2 ~]$ ./rmcast-cp 192.168.0.0/24 ~ rmcast-cp
//to build:
//g++ example/rmcast-cp.cpp -g -std=c++1y -Wall -Werror -pthread -Ipath-to-hmbdc-lib-include path-to-hmbdc-lib/libhmbdc.a /usr/local/lib/libboost_system.a /usr/local/lib/libboost_iostreams.a -lpthread -lrt -o /tmp/rmcast-cp
//
#include "hmbdc/app/rmcast/NetContext.hpp"
#include "hmbdc/os/Signals.hpp"
#include "hmbdc/os/DownloadFile.hpp"
#include <iostream>
#include <boost/lexical_cast.hpp>
using namespace std;
using namespace hmbdc;
using namespace hmbdc::app;
using namespace hmbdc::app::rmcast;
/**
* @brief a message to hold a memory mapped file
*
*/
struct MemMappedFile
, hasTag<1001> {
};
int main(int argc, char** argv) {
SingletonGuardian<SyncLogger> logGuard(std::cout); //logger helps debugging
bool isSrc = true;
size_t recipientCount = 0;
if (argc == 5) {
isSrc = true;
recipientCount = boost::lexical_cast<size_t>(argv[4]);
} else if (argc == 4) {
isSrc = false;
} else {
cerr << argv[0] << " <iface-addr> <source-dir> <file-name> <recipient-count> | <iface-addr> <dest-dir> <file-name>" << endl;
exit(1);
}
string dir(argv[2]);
string file(argv[3]);
Config config;
config.put("ifaceAddr", argv[1]);
using MyContext = Context<>;
MyContext ctx;
hmbdc::os::HandleSignals::onTermIntDo( /// install signal handler in case we want to abort
[&ctx] {
ctx.stop();
}
);
auto& net = NetContext::instance();
if (isSrc) { //run src mode
config.put("topicRegex", file); //we use the file name as topic
config.put("outBufferSizePower2", 20); //can send up to a 2^20 * 1000 = 1GB file with this buffer size configured
config.put("minRecvToStart", recipientCount); //start transfer until these many recipients are up
auto sengine = net.createSendTransportEngine(config
, max(sizeof(MemMappedFile), sizeof(StartMemorySegTrain))); //has to be able to hold the larger
auto sender = net.getSender(Topic(file));
ctx.start(*sengine, 0);
MemMappedFile m;
m.hasMemoryAttachment::map((dir + "/" + file).c_str());
sender->send(m);
while (sengine->sessionsRemainingActive()) { //wait until all recipients are done
sleep(1);
}
ctx.stop();
} else { //run dest mode
//need to list the attachment message tags here, otherwise no attachment
config.put("allowRecvMemoryAttachment", MemMappedFile::typeTag);
//the default behavior when receiving the memory attachment is to allocate memeory
//to hold the attachment
//in this case, we don't want the default since we are writing the attachment into a file.
//here, we write an attachment writer that does the job
struct FileWriter {
hasMemoryAttachment::AfterConsumedCleanupFunc afterConsumedCleanupFunc;
string dir_;
MyContext& ctx_;
//! [write a attachment writer]
FileWriter(string dir, MyContext& ctx)
: afterConsumedCleanupFunc(nullptr) //don't plan to clear anything here
, dir_(dir)
, ctx_(ctx) {
}
/**
* @brief this is called when the receiver starting to receive the attachment
* msg
*
* @param topic msg is on this topic
* @param senderId sender id
* @param typeTag msg tag
* @param a description of the attachment that is coming - not completed yet
* @param scratchpad user can set this value and expected it to be passed back in
* the sunsequent oneSegReady call for this message
*/
void
allocateFor(std::pair<char const*, char const*> topic
, std::string const& senderId
, uint16_t typeTag
, void*& scratchpad) {
auto df = new os::DownloadFile();
df->open(dir_.c_str(), string(topic.first, topic.second).c_str());
scratchpad = df; //have scratchpad remember df to use it later
a->attachment = nullptr;
a->len = 0;
}
/**
* @brief one segment of msg attachment arrived
* @details will be called once for each segment that arrives in order
*
* @param scratchpad the value that was set previously is passed back
* @param a description of the attachment
* @param seg segment adress
* @param segLen segment length
* @param remainingSegCount how many more segments in this attachment
*/
void
oneSegReady(void*& scratchpad
, void const* seg
, uint16_t segLen
, size_t remainingSegCount) {
auto df = (os::DownloadFile*)(scratchpad);
df->write(seg, segLen);
if (!remainingSegCount) {
//clean up here
df->close();
delete df;
ctx_.stop(); //receiver side quit
}
}
};
//! [write a attachment writer]
auto recvEngine = net.createRecvTransportEngine(config
, ctx.buffer()
, RecvTransport::NoOpArb()
, FileWriter(string(dir), ctx)
);
net.listenTo(Topic(file));
ctx.start(*recvEngine, 0);
}
ctx.join();
}