Source

quechua / modules / channel / stdch / stdch.cc

Full commit
/* 
 * Quechua - the lightweight data mining framework
 *
 * Copyright (C) 2012 Marek Denis <quechua@octogan.net>
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef STDCH_CC
#define STDCH_CC

#include "../../../include/quechua.h"
#include "../../../include/interface-channel.h"
#include "../../../include/interface-processor.h"
#include "../../../include/interface-logger.h"
#include "mydata.h"
#include <ev++.h>
#include <cstdio>
#include <cassert>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

class stamp_s;

class StandardChannel: public Channel {
public:
    StandardChannel();
    ~StandardChannel() {};
    bool prepare();
    bool start();
    bool stop();

    bool load_misc_conf(const setting_t& misc) {return true;};
    
private:
    const static int BUFFER_SIZE=1024;
    const static ssize_t port = 3333;
    ev::io io;
    int s;
   
    void init();
    
    void accept_cb(ev::io &watcher, int revents);
    void read_cb(ev::io &watcher, int revents);
    void async_cb(ev::async &watcher, int revents);

};

StandardChannel::StandardChannel() : Channel() {
};

void StandardChannel::async_cb(ev::async& watcher, int revents) {
#ifdef DEBUG_MSG
    LOG(DEBUG) << "StandardChannel::async_cb";
#endif
}

void StandardChannel::init() {
}

bool StandardChannel::prepare() {
    internal->watcher.set<StandardChannel,&StandardChannel::async_cb>(this);
    internal->watcher.set(*g_quechua->loop);
    internal->name = "StandardChannel";   
    /** accept connections */
    LOG(INFO) << "Listening on port "<<  port;
 
    struct sockaddr_in addr;
    s = socket(PF_INET, SOCK_STREAM, 0);
 
    addr.sin_family = AF_INET;
    addr.sin_port   = htons(port);
    addr.sin_addr.s_addr = INADDR_ANY;
 
    if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
        LOG(ERROR) << "bind";
        return false;
    }
 
    fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK); 
 
    listen(s, 5);
    io.set<StandardChannel,&StandardChannel::accept_cb>(this);
    io.set(*g_quechua->loop);
    return true;
}

bool StandardChannel::start() {
    LOG(INFO) << "StandardChannel::run()";
    io.start(s,ev::READ);
//    internal->watcher.start();
    return true;
};

bool StandardChannel::stop() {
    bool operation_result = true;
    if(close(s)==-1) {
        LOG(ERROR) << "StandardChannel:: error when closing socket";
        operation_result = false;

    }
    io.stop();
    return operation_result;
};

void StandardChannel::read_cb(ev::io &watcher, int revents) {
        LOG(DEBUG) << "StandardChannel::read_cb() start";
        char buffer[1024];
        memset((void*)buffer,0,1024);
#ifdef DEBUG_MSG
        LOG(DEBUG) << "read_cb, fd: " << watcher.fd ;
#endif
        ssize_t nread = recv(watcher.fd, buffer, sizeof(buffer), 0);
 
        if (nread < 0) {
            perror("read error");
            return;
        }
 
        if (nread == 0) {
#ifdef DEBUG_MSG
            LOG(DEBUG) << "Closing descriptor " << watcher.fd;
#endif
            watcher.stop();
            if(close(watcher.fd)==-1) {
                LOG(ERROR) << "StandardChannel: Error while closing socket "
                           << watcher.fd << " errno: " << errno;
            }
            delete &watcher;
            
        } else {
            LOG(DEBUG) << "Got message from channel: " << buffer;
            MyData *data = new MyData;
            data->content  = buffer;

//            memcpy((void*)data->content,(void*)buffer,1024);

            notify_workflows(data);
            data = NULL; // forget about the data collected;
        }
        LOG(DEBUG) << "StandardChannel::read_cb() stop";

};

void StandardChannel::accept_cb(ev::io &watcher, int revents) {
#ifdef DEBUG_MSG
        LOG(DEBUG) << "accept_cb";
#endif
        if (EV_ERROR & revents) {
            perror("got invalid event");
            return;
        }
 
        struct sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);
 
        int client_sd = accept(watcher.fd, (struct sockaddr *)&client_addr, &client_len);
 
        if (client_sd < 0) {
            perror("accept error");
            return;
        }
        
        //set socket non-block
        fcntl(client_sd, F_SETFL, fcntl(client_sd, F_GETFL, 0) | O_NONBLOCK);
        {
#ifdef DEBUG_MSG
            LOG(DEBUG) << "creating callback io watcher client_sd " << client_sd;
#endif
        }

        // deletion of those is handled in read_cb() callback
        ev::io *ev_io = new ev::io;
        ev_io->set<StandardChannel, &StandardChannel::read_cb>(this);
        ev_io->set(*g_quechua->loop);
        ev_io->start(client_sd, ev::READ);
        return;
    }

class StubProcessor: public Processor {
public:
    StubProcessor();
    ~StubProcessor();
    datapack_ptr preprocess(datapack_ptr data);
    datapack_ptr postprocess(datapack_ptr data);
protected:
    void init();
};

StubProcessor::StubProcessor():Processor() {
#ifdef DEBUG_MSG
    LOG(DEBUG) << "StubProcessor createt at: " << this;
#endif
};

StubProcessor::~StubProcessor() {
};

datapack_ptr StubProcessor::preprocess(datapack_ptr data) {
#ifdef DEBUG_MSG
    LOG(DEBUG) << "StubProcessor::preprocess() at " << this;
#endif
  ProcessorData* pack = new ProcessorData;
  MyData* tmp_data = dynamic_cast<MyData*>(data.get());
  pack->content = tmp_data->content;

  return datapack_ptr(pack);
    
};

datapack_ptr StubProcessor::postprocess(datapack_ptr data) {
#ifdef DEBUG_MSG
    {    LOG(DEBUG) << "StubProcessor::postprocess() at " << this;};
#endif
    return data;
};



/* LOGGER DATA */

class FileLoggerData : public DataPack {
public:
    FileLoggerData() : DataPack() {
    }
};

class FileLogger : public Logger {
public:
    FileLogger();
    bool prepare();
    bool start();
    bool stop();
    void logwork(datapack_ptr data);
private:
    int file_fd;
};

FileLogger::FileLogger() : Logger(),file_fd(-2) {
    internal->name = "FileLogger";
};

bool FileLogger::start() {return true;};

bool FileLogger::prepare() {
    const char* log_file = "/tmp/logger.logger";
    file_fd = open(log_file,O_WRONLY|O_CREAT|O_APPEND,S_IRWXU);
    if(file_fd < 0) {
        LOG(ERROR) << "Cannot open " << log_file << " logger file";
        return false;
    }
    LOG(INFO) << "Logger " << internal->name << " prepared";
    return true;
};

bool FileLogger::stop() {
    if(file_fd!=-2){
        if(close(file_fd)==-1) {
            LOG(ERROR) << "FileLogger: Cannot close filedescriptor "<< file_fd;
            return false;
        }
        return true;
    }
    LOG(WARN) << "FileLogger: filedescriptor has default value -2, not closing";
    return true;
};

void FileLogger::logwork(datapack_ptr data) {
    ProcessorData *pdata = reinterpret_cast<ProcessorData*>(data.get());
    string_t result("FileLogger result: ");
    result+=pdata->content;
    write(file_fd,(void*)result.c_str(),result.size());
    LOG(DEBUG) << "Data logged from pack at: " << data.get()
              << " logging: " << result;
    data.reset();
};

/* A SIMPLE FACTORY FOR CREATING AND DESTROYING OBJECTS */
extern "C" Channel* create_channel() {
    return new StandardChannel();
};

extern "C" void destroy_channel(Channel* channel) {
    if(channel!=NULL)
        delete channel;
};

extern "C" Processor* create_processor() {
    return new StubProcessor();
};

extern "C" void destroy_processor(Processor* processor) {
    if(processor!=NULL)
        delete processor;
};


extern "C" Logger* create_logger() {
    return new FileLogger();
};

extern "C" void destroy_logger(Logger* logger) {
    delete logger;
};


#endif