Source

quechua / modules / logger / postprocessortest / iplogger.cc

/* 
 * Quechua - the lightweight data mining framework
 *
 * Copyright (C) 2012 Marek Denis <quechua@octogan.net>
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "iplogger.h"

char PostProcessorLogger::buf[1024];

PostProcessorLogger::PostProcessorLogger() : Logger() {
};

PostProcessorLogger::~PostProcessorLogger() {
   if(connection) stop();
}

bool  PostProcessorLogger::load_misc_conf(const setting_t& misc) {
   try {
       misc.lookupValue("user",credentials.user);
       misc.lookupValue("password",credentials.password);
       misc.lookupValue("hostname",credentials.hostname);
       misc.lookupValue("database",credentials.database);
   } catch(setting_ex& ex) {
      LOG(WARN) << "One of the required settings was not found. "
                << "You must specify: "
                << "-user\n-pass\n-hostname\n-database name";
       return false;
   }
   return true;
}

bool PostProcessorLogger::prepare() {
   string_t connection_credentials = "dbname=" + credentials.database + 
                                     " user=" + credentials.user +
                                     " password=" + credentials.password + 
                                     " host=" + credentials.hostname;
   try {
       connection = new dbconnection_t(connection_credentials);
   } catch(alloc_ex& ex) {
       LOG(ERROR) << "dionaeaHarvester: Cannot allocate memory for connection object";
       return false;
   } catch(connection_ex& ex) {
       LOG(WARN) << "PostProcessorLogger: Database connection problem";
       delete connection;
       connection = NULL;
       return false;
   } catch(exception& ex) {
       LOG(WARN) << "Unexpected exception catched: " << ex.what();
       delete connection;
       connection = NULL;
       return false;
   }

   LOG(INFO) << "Connected to "<< credentials.user << "@" 
             << credentials.hostname << ":" << credentials.database;
   return true;
};

bool PostProcessorLogger::stop() {
   if(!connection)
       return true;
   connection->disconnect();
   LOG(INFO) << "Logger: Disconnected from " << credentials.user << "@" 
             << credentials.hostname << ":" << credentials.database;
   connection=NULL;
   return true;
}

void PostProcessorLogger::logwork(datapack_ptr data) {
   DionaeaConnections* connections = dynamic_cast<DionaeaConnections*>(data.get());
   db_stamp* stamp = dynamic_cast<db_stamp*>(connections->getstamp().get());

   list<DionaeaConnections::unindexedTransaction*>::iterator it;
   list<DionaeaConnections::unindexedTransaction*>::iterator end;
   it = connections->rows.begin();
   end= connections->rows.end();

   dbwork_t work(*connection);
   index_t oid = 0;

   if(!(oid = get_oid(&work))) return;

   insert_into_operation(&work,oid,stamp->from,stamp->to);

   for(;it!=connections->rows.end();++it) {
       insert_into_preprocesstest(&work,oid,*it);
   }
   try {
       work.commit();
   } catch (commit_ex& ex) {
       LOG(WARN) << "Logger " << internal->name << ". "
                 << "Not sure whether transaction was commited for oid: " << oid;
   }
}

char* PostProcessorLogger::quotize(char* dst, size_t n, const char* src) {
   memset(dst,0,n);
   snprintf(dst,n,"'%s'",src);
   return dst;
};

index_t PostProcessorLogger::get_oid(dbwork_t* w) {
   static const char* sel_oid="SELECT nextval('miner.operation_id_seq');";
   dbresult_t result = w->exec(sel_oid);
   if(result.empty()) { // error
       LOG(ERROR) << "Logger "<< internal->name << " at: " << this << " Couldn't fetch oid";
       return 0;
   }

   return result[0][0].as<index_t>();
}



void PostProcessorLogger::insert_into_operation(dbwork_t* w, index_t& oid, const string_t& from, const string_t& to) {
   static const char* insert_op = "INSERT INTO miner.operation (id,start_time,end_time) VALUES (%lu, %s, %s);";
   memset(buf,0,1024);
   snprintf(buf,1024,insert_op,oid,from.c_str(),to.c_str());
   w->exec(buf);
}

void PostProcessorLogger::insert_into_preprocesstest(dbwork_t* w,index_t& oid,DionaeaConnections::unindexedTransaction* txn) {
   static char proto[8];
   static char src_host[64];
   static char dst_host[64];
   static const char* insert = "INSERT INTO miner.postprocess_test (oid, protocol, remote_host, remote_port, local_host, local_port, count, generator) VALUES (%lu, %s, %s, %d, %s, %d, %d, %s);";

   memset(buf,0,1024);
   snprintf(buf,1024,insert,oid,
   txn->protocol?quotize(proto,8,txn->protocol):"NULL", // proto
   txn->src_host?quotize(src_host,64,txn->src_host):"NULL", // remote host
   txn->src_port,
   txn->dst_host?quotize(dst_host,64,txn->dst_host):"NULL", // remote host
   txn->dst_port,
   txn->count,
   txn->generator?"true":"false");

   w->exec(buf);
}