Source

quechua / modules / logger / dionaealogger / dionaealogger.cc

Full commit
/* 
 * Quechua - the lightweight data mining framework
 *
 * Copyright (C) 2012 Marek Denis <quechua@octogan.net>
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "dionaealogger.h"

char DionaeaLogger::buf[1024];

DionaeaLogger::DionaeaLogger() : Logger(),connection(NULL) {
};

DionaeaLogger::~DionaeaLogger() {
   if(connection) stop();
}

bool  DionaeaLogger::load_misc_conf(const setting_t& misc) {
   try {
       misc.lookupValue("user",credentials.user);
       misc.lookupValue("password",credentials.password);
       misc.lookupValue("hostname",credentials.hostname);
       misc.lookupValue("database",credentials.database);
   } catch(setting_ex& ex) {
      LOG(WARN) << internal->name << ": One of the required settings was not found. "
                << "You must specify: "
                << "-user\n-pass\n-hostname\n-database name";
       return false;
   }
   return true;
}

bool DionaeaLogger::prepare() {
    LOG(INFO) << internal->name << " "
              << " prepared (nothing was done)";
    return true;
}

bool DionaeaLogger::stop() {
    LOG(INFO) << internal->name << " "
              << "is not stopping";
    return disconnect();
    
}

bool DionaeaLogger::connect() {
   string_t connection_credentials = "dbname=" + credentials.database + 
                                     " user=" + credentials.user +
                                     " password=" + credentials.password + 
                                     " host=" + credentials.hostname;
   
   try { 
       connection = new dbconnection_t(connection_credentials);
   } catch(alloc_ex& ex) {
       LOG(ERROR) << internal->name << ": Cannot allocate memory for connection object";
       return false;
   } catch(connection_ex& ex) {
       LOG(WARN) << internal->name << ": Database connection problem";
       delete connection;
       connection = NULL;
       return false;
   } catch(exception& ex) {
       LOG(WARN) << internal->name << ": Unexpected exception catched: " << ex.what();
       delete connection;
       connection = NULL;
       return false;
   }

   LOG(INFO) << internal->name << ": Connected to "<< credentials.user << "@" 
             << credentials.hostname << ":" << credentials.database;
   return true;
};

bool DionaeaLogger::disconnect() {
   if(!connection)
       return true;
   connection->disconnect();
   LOG(INFO) << internal->name << ": Disconnected from " << credentials.user << "@" 
             << credentials.hostname << ":" << credentials.database;
   connection=NULL;
   return true;
}

void DionaeaLogger::logwork(datapack_ptr data) {
   DionaeaConnections* connections = dynamic_cast<DionaeaConnections*>(data.get());
   db_stamp*           stamp       = dynamic_cast<db_stamp*>(connections->getstamp().get());

   list<DionaeaConnections::unindexedTransaction*>::iterator it;
   list<DionaeaConnections::unindexedTransaction*>::iterator end;
   it = connections->rows.begin();
   end= connections->rows.end();
   
   connect();

   try {
     if(!check_and_complain_connection(connection))
         return;
     dbwork_t work(*connection);
     index_t oid = 0;

     if(!(oid = get_oid(&work)))
         return;
     if(!insert_into_operation(&work,oid,stamp->from,stamp->to,stamp->interval))
         return;

     for(;it!=connections->rows.end();++it) {
         if(!insert_into_freq_itemsets(&work,oid,*it))
             return;
     }
       work.commit();
    
   } catch(connection_ex& ex) {
       LOG(ERROR) << internal->name << ": Check your database and/or connection";
   } catch (commit_ex& ex) {
       LOG(WARN) << internal->name << ": "
                 << "Not sure whether transaction was commited "
                 << ex.what();
   } catch(exception& ex) {
       LOG(ERROR) << internal->name << ": Unhandled exception caught: " << ex.what();
   }

   disconnect();
}

char* DionaeaLogger::quotize(char* dst, size_t n, const char* src) {
   memset(dst,0,n);
   snprintf(dst,n,"'%s'",src);
   return dst;
};

inline bool DionaeaLogger::check_connection(const dbconnection_t* c) {
    return c && c->is_open();
};

inline bool DionaeaLogger::check_and_complain_connection(const dbconnection_t* c) {
    if(check_connection(c))
        return true;
    LOG(ERROR) << internal->name << ": check your database and/or connection";
    return false;
};

bool DionaeaLogger::exec_query(dbwork_t& w,dbresult_t& r, const char* q, bool commit) {
    try {
        r = w.exec(q);
        if(commit)
            w.commit();
    } catch(connection_ex& ex) {
        LOG(ERROR) << internal->name << ": Check your database and/or connection "
                   << ex.what();
        return false;
    } catch (undefined_column_ex& ex) {
        LOG(ERROR) << internal->name << ": Missing column? Check your SQL query and/or database, something is wrong\n"
                   << "Query executed: " << q
                   << " " << ex.what();
        return false;
        
    } catch (commit_ex& ex) {
        LOG(WARN) << internal->name  << ": Not sure whether transaction was commited. " 
                  << ex.what();
        return false;

    } catch(undef_table_ex& ex) {
        LOG(ERROR) << internal->name  << ": Trying to read from non existed table while executing SQL: " 
                   << q << " " << ex.what();
        return false;

    } catch(sql_ex& ex) {
        LOG(ERROR) << internal->name << ": For SQL: " << q
                   << " an SQL exception was caught, discarding the data "
                   << ex.what();
        return false;

    } catch(logic_ex& ex) {
        LOG(CRITIC) << internal->name << ": logic_error exception was caught when executing SQL query. "
                    << ex.what();
        return false;

    } catch(exception& ex) {
        LOG(CRITIC) << internal->name << ": While executing SQL: " << q << " unhandled exception was catched, rethrowing";
        throw ex;
    }

    return true;
}

index_t DionaeaLogger::get_oid(dbwork_t* w) {
    static const char* sel_oid="SELECT nextval('miner.operations_id_seq');";
    dbresult_t result;

    if(!exec_query(*w,result,sel_oid,false) || result.empty() ) {
        LOG(ERROR) << internal->name << ": Couldn't fetch oid";
        return 0;
    }
    return result[0][0].as<index_t>();
  
}

bool DionaeaLogger::insert_into_operation(dbwork_t* w, index_t& oid, const string_t& from, const string_t& to,const int interval) {
   static const char* insert_op = "INSERT INTO miner.operations (id,start_time,end_time,interval) VALUES (%lu, %s, %s,%d);";
   memset(buf,0,1024);
   snprintf(buf,1024,insert_op,oid,from.c_str(),to.c_str(),interval);
   dbresult_t r; // we need faked object for exec_query() - how about making it static ?
   return exec_query(*w,r,buf,false);
// w->exec(buf);
}

bool DionaeaLogger::insert_into_freq_itemsets(dbwork_t* w,index_t& oid,DionaeaConnections::unindexedTransaction* txn) {
   static char proto[8];
   static char src_host[64];
   static char dst_host[64];
   static const char* insert = "INSERT INTO miner.freq_itemsets (oid, protocol, remote_host, remote_port, local_host, local_port, count, generator) VALUES (%lu, %s, %s, %d, %s, %d, %d, %s);";

   memset(buf,0,1024);
   snprintf(buf,1024,insert,oid,
   txn->protocol?quotize(proto,8,txn->protocol):"NULL", // proto
   txn->src_host?quotize(src_host,64,txn->src_host):"NULL", // remote host
   txn->src_port,
   txn->dst_host?quotize(dst_host,64,txn->dst_host):"NULL", // remote host
   txn->dst_port,
   txn->count,
   txn->generator?"true":"false");

   dbresult_t r;// we need faked object for exec_query() - how about making it static ?
   return exec_query(*w,r,buf,false);
// w->exec(buf);
}