Wiki
Clone wikiaiengine / DatabaseIntegrationPython
Database integration
One of the main functions of the engine is the easy integration with databases. Lets see some examples of how works the database interface.
On the first example we will use Redis(http://redis.io/) So we create a class call redisAdaptor that implements the methods update,insert and remove. So for every new flow that the system receives the method insert will be called.
import pyaiengine import redis class redisAdaptor(pyaiengine.DatabaseAdaptor): def __init__(self): self.__r = None def connect(self,connection_str): self.__r = redis.Redis(connection_str) def update(self,key,data): self.__r.hset("udpflows",key,data) def insert(self,key): self.__r.hset("udpflows",key,"{}") def remove(self,key): self.__r.hdel("udpflows",key)
On the other hand you can use Cassandra(https://cassandra.apache.org/) as a second example
import pyaiengine import pycassa import json class cassandraAdaptor(pyaiengine.DatabaseAdaptor): """ This class inheritance of DatabaseAdaptor that contains the following methods: - insert, called on the first insertion of the network flow - update, called depending on the sample selected. - remove, called when the flow is destroy. """ def __init__(self): self.__c = None self.__pool = None def connect(self,connection_str): self.__pool = pycassa.ConnectionPool(keyspace='demo', server_list=['127.0.0.1:9160'], prefill=False) self.__c = pycassa.ColumnFamily(self.__pool, 'flows') def update(self,key,data): obj = json.loads(data) bytes = obj["bytes"] l7 = obj["layer7"] l7info = obj.get("httphost",0) if (l7info == 0): l7info = obj.get("sslphost",0) if ( l7info > 0): d["layer7info"] = l7info else: d["layer7info"] = l7info # Create a dict with all the values of the cassandra table d = {'bytes':bytes,'layer7':l7} self.__c.insert(key,d) def insert(self,key): self.__c.insert(key,{'bytes':0}) def remove(self,key): # We dont remove anything on this example pass
Or use Hadoop with the PyTables(https://pytables.github.io/) interface.
import pyaiengine import tables import json class hadoopFlow(tables.IsDescription): name = tables.StringCol(50,pos = 1) bytes = tables.Int32Col(pos = 2) l7 = tables.StringCol(32,pos = 3) layer7info = tables.StringCol(64, pos = 4) class hadoopAdaptor(pyaiengine.DatabaseAdaptor): def __init__(self): self.__file = None self.__group = None self.__table = None def connect(self,connection_str): self.__file = tables.open_file(connection_str, mode="w") self.__group = self.__file.create_group(self.__file.root, "flows") self.__table_tcp = self.__file.create_table(self.__group, 'table_tcp', hadoopFlow, "Flow table", tables.Filters(0)) self.__table_udp = self.__file.create_table(self.__group, 'table_udp', hadoopFlow, "Flow table", tables.Filters(0)) def __handle_udp(self,key,obj): query = "name == b'%s'" % key for f in self.__table_udp.where(query): f['bytes'] = obj["bytes"] f['l7'] = obj["layer7"] l7info = obj.get("dnsdomain",0) if (l7info > 0): f['layer7info'] = l7info f.update() def update(self,key,data): try: obj = json.loads(data) except: print "ERROR:",data return proto = int(key.split(":")[2]) if (proto == 6): self.__handle_tcp(key,obj) else: self.__handle_udp(key,obj) def insert(self,key): proto = int(key.split(":")[2]) if (proto == 6): t = self.__table_tcp else: t = self.__table_udp f = t.row f['name'] = key f['bytes'] = 0 f.append() t.flush() def remove(self,key): # We dont remove anything on this example pass
We create a new instance of a LAN network on the main
st = pyaiengine.StackLan()
Allocate the maximum number of UDP flows on the system
st.udp_flows = 163840
Create a new instance of the DatabaseAdaptor and plug it to the UDP part of the engine, so only UDP traffic will be process.
# Use your own adaptor (redisAdaptor, cassandraAdaptor, hadoopAdaptor) db = redisAdaptor() db.connect("localhost") st.set_udp_database_adaptor(db,16)
Open the network device, attach the stack and let the engine run
with pyaiengine.PacketDispatcher("eth0") as pd: pd.stack = st pd.run()
Now you can check the results on the redis/cassandra/hadoop database.
Updated