Commits

Quinton Anderson  committed f989e24

Completed the spout, and the drools integration, remaining bolts will get done tomorrow.

  • Participants
  • Parent commits fd7dabc

Comments (0)

Files changed (20)

File dropAndCreateSchema.txt

+drop keyspace Logging;
+
+
+
+    
+
 			<scope>test</scope>
 		</dependency>
 		<dependency>
-			<groupId>org.hectorclient</groupId>
-			<artifactId>hector-core</artifactId>
-			<version>1.1-2</version>
-		</dependency>
-		<dependency>
 			<groupId>org.slf4j</groupId>
 			<artifactId>slf4j-log4j12</artifactId>
 			<version>1.6.1</version>
 		</dependency>
         <dependency>
             <groupId>org.jmock</groupId>
-            <artifactId>jmock-junit4</artifactId>
-            <version>2.5.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.jmock</groupId>
             <artifactId>jmock-legacy</artifactId>
             <version>2.5.1</version>
             <scope>test</scope>
-        </dependency>		<dependency>
+        </dependency>		
+        <dependency>
 			<groupId>storm</groupId>
 			<artifactId>storm</artifactId>
 			<version>0.8.1</version>
 			<!-- keep storm out of the jar-with-dependencies -->
 			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<artifactId>slf4j-api</artifactId>
+					<groupId>org.slf4j</groupId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 		<dependency>
 			<groupId>com.googlecode.json-simple</groupId>
         </dependency>
         <dependency>
             <groupId>org.jmock</groupId>
-            <artifactId>jmock-junit3</artifactId>
+            <artifactId>jmock-junit4</artifactId>
             <version>2.5.1</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+      		<groupId>com.rapportive</groupId>
+      		<artifactId>storm-json</artifactId>
+      		<version>0.0.1</version>
+    	</dependency>
+    	<dependency>
+      		<groupId>com.github.ptgoetz</groupId>
+      		<artifactId>storm-cassandra</artifactId>
+      		<version>0.3.1-SNAPSHOT</version>
+    	</dependency>
+    	<dependency>
+      		<groupId>org.elasticsearch</groupId>
+      		<artifactId>elasticsearch</artifactId>
+      		<version>0.20.2</version>
+    	</dependency>
+    	<dependency>
+      		<groupId>org.drools</groupId>
+      		<artifactId>drools-core</artifactId>
+      		<version>5.5.0.Final</version>
+    	</dependency>
+    	<dependency>
+        	<groupId>org.drools</groupId>
+        	<artifactId>drools-compiler</artifactId>
+        	<version>5.5.0.Final</version>
+    	</dependency>
     </dependencies>
 
 	<build>
+	    <resources>
+			<resource>
+				<directory>${basedir}/multilang</directory>
+			</resource>
+		</resources>
 		<plugins>
 		    <!-- 
 		    bind the maven-assembly-plugin to the package phase

File src/main/java/storm/cookbook/LogTopology.java

-package storm.cookbook;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.*;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-public class LogTopology {
-
-    private TopologyBuilder builder = new TopologyBuilder();
-    private Config conf = new Config();
-    private LocalCluster cluster;
-
-    public static final String DEFAULT_JEDIS_PORT = "6379";
-
-    public LogTopology(){
-        
-    }
-
-    public TopologyBuilder getBuilder() {
-        return builder;
-    }
-    
-    public LocalCluster getLocalCluster(){
-    	return cluster;
-    }
-
-    public Config getConf() {
-        return conf;
-    }
-
-    public void runLocal(int runTime){
-        conf.setDebug(true);
-        //conf.put(Conf.REDIS_HOST_KEY, "localhost");
-        cluster = new LocalCluster();
-        cluster.submitTopology("test", conf, builder.createTopology());
-        if(runTime > 0){
-            Utils.sleep(runTime);
-            shutDownLocal();
-        }
-    }
-
-    public void shutDownLocal(){
-        if(cluster != null){
-            cluster.killTopology("test");
-            cluster.shutdown();
-        }
-    }
-
-    public void runCluster(String name, String redisHost) throws AlreadyAliveException, InvalidTopologyException {
-        conf.setNumWorkers(20);
-        //conf.put(Conf.REDIS_HOST_KEY, redisHost);
-        StormSubmitter.submitTopology(name, conf, builder.createTopology());
-    }
-
-
-
-	public static void main(String[] args) throws Exception {
-
-        LogTopology topology = new LogTopology();
-        
-
-        if(args!=null && args.length > 1) {
-            topology.runCluster(args[0], args[1]);
-        } else {
-            if(args!=null && args.length == 1)
-                System.out.println("Running in local mode, redis ip missing for cluster run");
-            topology.runLocal(10000);
-        }
-
-	}
-
-}

File src/main/java/storm/cookbook/log/Conf.java

+package storm.cookbook.log;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: admin
+ * Date: 2012/12/10
+ * Time: 5:40 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class Conf {
+    public static final String REDIS_HOST_KEY = "redisHost";
+    public static final String REDIS_PORT_KEY = "redisPort";
+}

File src/main/java/storm/cookbook/log/FieldNames.java

+package storm.cookbook.log;
+
+public class FieldNames {
+    public static final String LOG_ENTRY = "LogEntry";
+}

File src/main/java/storm/cookbook/log/LogRulesBolt.java

+package storm.cookbook.log;
+
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.drools.KnowledgeBase;
+import org.drools.KnowledgeBaseFactory;
+import org.drools.builder.KnowledgeBuilder;
+import org.drools.builder.KnowledgeBuilderFactory;
+import org.drools.builder.ResourceType;
+import org.drools.io.ResourceFactory;
+import org.drools.runtime.StatelessKnowledgeSession;
+
+import storm.cookbook.log.model.LogEntry;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class LogRulesBolt extends BaseRichBolt {
+	
+	private static final long serialVersionUID = 1L;
+	public static Logger LOG = Logger.getLogger(LogRulesBolt.class);
+	private StatelessKnowledgeSession ksession;
+	private OutputCollector collector;
+
+	@Override
+	public void prepare(Map stormConf, TopologyContext context,
+			OutputCollector collector) {
+		this.collector = collector;
+		//TODO: load the rule definitions from an external agent instead of the classpath.
+		KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+		kbuilder.add( ResourceFactory.newClassPathResource( "/Syslog.drl", 
+		              getClass() ), ResourceType.DRL );
+		if ( kbuilder.hasErrors() ) {
+		    LOG.error( kbuilder.getErrors().toString() );
+		}
+		KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
+		kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );	
+		ksession = kbase.newStatelessKnowledgeSession();
+	}
+
+	@Override
+	public void execute(Tuple input) {
+		LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ENTRY);
+		if(entry == null){
+			LOG.fatal( "Received null or incorrect value from tuple" );
+			return;
+		}
+		ksession.execute( entry );
+		if(!entry.isFilter()){
+			collector.emit(new Values(entry));
+		}
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields(FieldNames.LOG_ENTRY));
+	}
+
+}

File src/main/java/storm/cookbook/log/LogSpout.java

+package storm.cookbook.log;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Values;
+import backtype.storm.tuple.Fields;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import redis.clients.jedis.Jedis;
+import storm.cookbook.log.model.LogEntry;
+
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+public class LogSpout extends BaseRichSpout {
+
+    public static Logger LOG = Logger.getLogger(LogSpout.class);
+    
+    public static final String LOG_CHANNEL = "log";
+
+    private Jedis jedis;
+    private String host;
+    private int port;
+    private SpoutOutputCollector collector;
+
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields(FieldNames.LOG_ENTRY));
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+        host = conf.get(Conf.REDIS_HOST_KEY).toString();
+        port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
+        this.collector = spoutOutputCollector;
+        connectToRedis();
+    }
+
+    private void connectToRedis() {
+        jedis = new Jedis(host, port);
+    }
+
+    @Override
+    public void nextTuple() {
+        String content = jedis.rpop(LOG_CHANNEL);
+        if(content==null || "nil".equals(content)) {
+            try { Thread.sleep(300); } catch (InterruptedException e) {}
+        } else {
+            JSONObject obj=(JSONObject) JSONValue.parse(content);
+            LogEntry entry = new LogEntry(obj);
+            collector.emit(new Values(entry));
+        }
+    }
+}

File src/main/java/storm/cookbook/log/LogTopology.java

+package storm.cookbook.log;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+
+public class LogTopology {
+
+	private TopologyBuilder builder = new TopologyBuilder();
+	private Config conf = new Config();
+	private LocalCluster cluster;
+
+	public static final String DEFAULT_JEDIS_PORT = "6379";
+
+	public LogTopology() {
+		builder.setSpout("logSpout", new LogSpout(), 10);
+
+		builder.setBolt("logRules", new LogRulesBolt(), 10).shuffleGrouping(
+				"logSpout");
+
+		// indexer bolt (shuffle from rules)
+		// volume count from rules (field grouping per file)
+		// Volume persister, shuffle grouping from volume count.
+
+		// Maybe add:
+		// Stem and stop word counting per file
+		// The persister for the stem analysis (need to check the counting
+		// capability first on storm-cassandra)
+
+		conf.put(Conf.REDIS_PORT_KEY, DEFAULT_JEDIS_PORT);
+	}
+
+	public TopologyBuilder getBuilder() {
+		return builder;
+	}
+
+	public LocalCluster getLocalCluster() {
+		return cluster;
+	}
+
+	public Config getConf() {
+		return conf;
+	}
+
+	public void runLocal(int runTime) {
+		conf.setDebug(true);
+		conf.put(Conf.REDIS_HOST_KEY, "localhost");
+		cluster = new LocalCluster();
+		cluster.submitTopology("test", conf, builder.createTopology());
+		if (runTime > 0) {
+			Utils.sleep(runTime);
+			shutDownLocal();
+		}
+	}
+
+	public void shutDownLocal() {
+		if (cluster != null) {
+			cluster.killTopology("test");
+			cluster.shutdown();
+		}
+	}
+
+	public void runCluster(String name, String redisHost)
+			throws AlreadyAliveException, InvalidTopologyException {
+		conf.setNumWorkers(20);
+		conf.put(Conf.REDIS_HOST_KEY, redisHost);
+		StormSubmitter.submitTopology(name, conf, builder.createTopology());
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		LogTopology topology = new LogTopology();
+
+		if (args != null && args.length > 1) {
+			topology.runCluster(args[0], args[1]);
+		} else {
+			if (args != null && args.length == 1)
+				System.out
+						.println("Running in local mode, redis ip missing for cluster run");
+			topology.runLocal(10000);
+		}
+
+	}
+
+}

File src/main/java/storm/cookbook/log/model/LogEntry.java

+package storm.cookbook.log.model;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import storm.cookbook.log.LogRulesBolt;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class LogEntry {
+	
+	public static Logger LOG = Logger.getLogger(LogEntry.class);
+	
+	private String source;
+	
+	private String type;
+	
+	private List<String> tags = new LinkedList<String>();
+	
+	private Map<String,String> fields = new HashMap<String, String>();
+	
+	private Date timestamp;
+	
+	private String sourceHost;
+	
+	private String sourcePath;
+	
+	private String message = "";
+	
+	private boolean filter = false;
+	
+	private NotificationDetails notifyAbout = null;
+	
+	private static String[] FORMATS = new String[]{"yyyy-MM-dd'T'HH:mm:ss.SSS",
+		"yyyy.MM.dd G 'at' HH:mm:ss z",
+		"yyyyy.MMMMM.dd GGG hh:mm aaa",
+		"EEE, d MMM yyyy HH:mm:ss Z",
+		"yyMMddHHmmssZ"};
+	
+	public LogEntry(JSONObject json){
+		source = (String)json.get("@source");
+		timestamp = parseDate((String)json.get("@timestamp"));
+		sourceHost = (String)json.get("@source_host");
+		sourcePath = (String)json.get("@source_path");
+		message = (String)json.get("@message");
+		type = (String)json.get("@type");
+		JSONArray array = (JSONArray)json.get("@tags");
+		tags.addAll(array);
+		JSONObject fields = (JSONObject)json.get("@fields");
+		fields.putAll(fields);
+	}
+	
+	private Date parseDate(String value){
+		for(int i = 0; i < FORMATS.length; i++){
+			SimpleDateFormat format = new SimpleDateFormat(FORMATS[i]);
+			Date temp;
+			try {
+				temp = format.parse(value);
+				if(temp != null)
+					return temp;
+			} catch (ParseException e) {}
+		}
+		LOG.error("Could not parse timestamp for log");
+		return null;
+	}
+	
+
+	public boolean isFilter() {
+		return filter;
+	}
+
+	public void setFilter(boolean filter) {
+		this.filter = filter;
+	}
+
+	public NotificationDetails getNotificationDetails() {
+		return notifyAbout;
+	}
+
+	public void notifyAbout(NotificationDetails notifyAbout) {
+		this.notifyAbout = notifyAbout;
+	}
+
+	public String getSource() {
+		return source;
+	}
+
+	public List<String> getTags() {
+		return tags;
+	}
+
+	public Map<String, String> getFields() {
+		return fields;
+	}
+	
+	public void addField(String name, String value){
+		fields.put(name, value);
+	}
+	
+	public void addTag(String tag){
+		tags.add(tag);
+	}
+
+	public Date getTimestamp() {
+		return timestamp;
+	}
+
+	public void setTimestamp(Date timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	public String getSourceHost() {
+		return sourceHost;
+	}
+
+	public String getSourcePath() {
+		sourcePath.contains("");
+		return sourcePath;
+	}
+
+	public String getMessage() {
+		return message;
+	}
+
+	public void setSource(String source) {
+		this.source = source;
+	}
+
+	public void setSourceHost(String sourceHost) {
+		this.sourceHost = sourceHost;
+	}
+
+	public void setSourcePath(String sourcePath) {
+		this.sourcePath = sourcePath;
+	}
+
+	public String getType() {
+		return type;
+	}
+
+	public void setType(String type) {
+		this.type = type;
+	}
+	
+	
+	
+	
+
+}

File src/main/java/storm/cookbook/log/model/NotificationDetails.java

+package storm.cookbook.log.model;
+
+public class NotificationDetails {
+	
+	private String to;
+	
+	private Severity severity;
+	
+	private String message;
+	
+	public NotificationDetails(String to, Severity severity, String message){
+		this.to = to;
+		this.severity = severity;
+		this.message = message;
+	}
+
+	public String getTo() {
+		return to;
+	}
+
+	public Severity getSeverity() {
+		return severity;
+	}
+
+	public String getMessage() {
+		return message;
+	}
+	
+	
+
+}

File src/main/java/storm/cookbook/log/model/Severity.java

+package storm.cookbook.log.model;
+
+public enum Severity {
+	
+	LOW,MEDIUM,HIGH
+
+}

File src/main/resources/Syslog.drl

+package storm.cookbook.log.rules
+
+import storm.cookbook.log.model.LogEntry;
+import java.util.regex.Matcher
+import java.util.regex.Pattern
+
+
+rule "Host Correction"
+
+    when
+        l: LogEntry(sourceHost == "localhost")
+    then
+        l.setSourceHost("localhost.example.com");
+
+end
+
+rule "Filter By Type"
+    when
+        l: LogEntry(type != "syslog")
+    then
+        l.setFilter(true);
+end
+
+rule "Extract Fields"
+	salience 100//run later
+	when
+		l: LogEntry(filter != true)
+	then
+		String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
+		Matcher matcher = Pattern.compile(logEntryPattern).matcher(l.getMessage());
+		if(matcher.find()){
+			l.addField("_pid",matcher.group(1));
+			l.addField("_src",matcher.group(2));
+		}
+end

File src/main/resources/log4j.properties

-log4j.rootLogger=DEBUG,DRFA
-
-
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-
-log4j.appender.DRFA.File=logs/cassandra-tutorial.log
-
-# Rollver at midnight
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
-# Debugging Pattern format
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
-
-# stdout
-log4j.category.com.datastax=DEBUG
-# Uncomment to see Hector debug information 
-log4j.category.me.prettyprint=DEBUG
-log4j.category.org.apache=DEBUG

File src/main/resources/testData1.json

+{
+"@source":"file://logServer/var/log/auth.log",
+"@tags":[],
+"@fields":{},
+"@timestamp":"2013-01-04T11:09:16.171Z",
+"@source_host":"logServer",
+"@source_path":"/var/log/auth.log",
+"@message":"Jan  4 11:08:04 logServer sudo:  vagrant : TTY=pts/1 ; PWD=/home/vagrant ; USER=root ; COMMAND=/usr/bin/nano shipper.conf",
+"@type":"syslog"
+}

File src/test/java/storm/cookbook/CassandraSandbox.java

-package storm.cookbook;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Properties;
-
-import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
-import me.prettyprint.cassandra.serializers.CompositeSerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.service.ThriftKsDef;
-import me.prettyprint.hector.api.Cluster;
-import me.prettyprint.hector.api.HConsistencyLevel;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.Composite;
-import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
-import me.prettyprint.hector.api.ddl.ColumnType;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.MutationResult;
-import me.prettyprint.hector.api.mutation.Mutator;
-
-import org.json.simple.JSONObject;
-import org.junit.Test;
-
-public class CassandraSandbox {
-
-	protected static Cluster cluster;
-	protected static Keyspace keyspace;
-	protected static Properties properties;
-
-	static StringSerializer stringSerializer = StringSerializer.get();
-	static LongSerializer longSerializer = LongSerializer.get();
-	static CompositeSerializer compositeSerializer = CompositeSerializer.get();
-
-	private static final String CF = "FileCountsByDay";
-
-	@Test
-	public void GenJSON() {
-		JSONObject content = new JSONObject();
-		JSONObject subElement = new JSONObject();
-		content.put("Test", subElement);
-		subElement.put("Date", "1");
-		System.out.println(content.toJSONString());
-	}
-
-	@Test
-	public void test() {
-		properties = new Properties();
-		try {
-			properties.load(CassandraSandbox.class
-					.getResourceAsStream("/cassandra.properties"));
-		} catch (IOException ioe) {
-			ioe.printStackTrace();
-		}
-
-		cluster = HFactory.getOrCreateCluster(
-				properties.getProperty("cluster.name", "DefaultCluster"),
-				properties.getProperty("cluster.hosts", "127.0.0.1:9160"));
-		ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
-		ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.ONE);
-
-		String keyspaceName = properties.getProperty("logging.keyspace",
-				"Logging");
-
-		KeyspaceDefinition keyspaceDef = cluster.describeKeyspace(keyspaceName);
-
-		Calendar today = Calendar.getInstance();
-
-		String key = String.format("%04d:%02d:%02d", today.get(Calendar.YEAR),
-				today.get(Calendar.MONTH), today.get(Calendar.DAY_OF_MONTH));
-
-		if (keyspaceDef == null) {
-			ColumnFamilyDefinition cfDef = HFactory
-					.createColumnFamilyDefinition(properties.getProperty(
-							"logging.keyspace", "Logging"), CF,
-							ComparatorType.UTF8TYPE);
-			cfDef.setDefaultValidationClass(ComparatorType.COUNTERTYPE
-					.getClassName());
-			cfDef.setColumnType(ColumnType.STANDARD);
-
-			KeyspaceDefinition newKeyspace = HFactory.createKeyspaceDefinition(
-					keyspaceName, ThriftKsDef.DEF_STRATEGY_CLASS, 1,
-					Arrays.asList(cfDef));
-			cluster.addKeyspace(newKeyspace, true);
-
-			keyspace = HFactory.createKeyspace(keyspaceName, cluster, ccl);
-		}
-
-		if (keyspace == null)
-			keyspace = HFactory.createKeyspace(keyspaceName, cluster, ccl);
-
-		Mutator<String> mutator = HFactory.createMutator(keyspace,
-				StringSerializer.get());
-		mutator.incrementCounter(key, CF,
-				"localhost:/var/log/apache2/access.log", 1);
-
-		MutationResult mr = mutator.execute();
-		System.out.println(mr.toString());
-
-		cluster.getConnectionManager().shutdown();
-	}
-
-}

File src/test/java/storm/cookbook/log/IntegrationTestTopology.java

+package storm.cookbook.log;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import redis.clients.jedis.Jedis;
+import backtype.storm.utils.Utils;
+
+/**
+ * The integration test basically injects on the input queue, and then
+ * introduces a test bolt which simply persists the tuple into a JSON object
+ * onto an output queue. Note that test is parameter driven, but the cluster is
+ * only shutdown once all tests have run
+ * */
+public class IntegrationTestTopology {
+
+	public static final String REDIS_CHANNEL = "TestLogBolt";
+
+	private static Jedis jedis;
+	private static LogTopology topology = new LogTopology();
+	private static TestBolt testBolt = new TestBolt(REDIS_CHANNEL);
+
+	@BeforeClass
+	public static void setup() {
+		// We want all output tuples coming to the mock for testing purposes
+		topology.getBuilder().setBolt("testBolt", testBolt, 1)
+				.globalGrouping("logstashBolt");
+		// run in local mode, but we will shut the cluster down when we are
+		// finished
+		topology.runLocal(0);
+		// jedis required for input and ouput of the cluster
+		jedis = new Jedis("localhost",
+				Integer.parseInt(LogTopology.DEFAULT_JEDIS_PORT));
+		jedis.connect();
+		jedis.flushDB();
+		// give it some time to startup before running the tests.
+		Utils.sleep(5000);
+	}
+
+	@AfterClass
+	public static void shutDown() {
+		topology.shutDownLocal();
+		jedis.disconnect();
+	}
+
+
+	@Test
+	public void inputOutputClusterTest() throws IOException {
+		String testData = UnitTestUtils.readFile("/testData1.json");
+		jedis.rpush("log", testData);
+		Utils.sleep(3000);
+		String data = jedis.rpop(REDIS_CHANNEL);
+		System.out.println(data);
+	}
+
+}

File src/test/java/storm/cookbook/log/TestBolt.java

+package storm.cookbook.log;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.json.simple.JSONArray;
+
+import redis.clients.jedis.Jedis;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+public final class TestBolt extends BaseRichBolt {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final transient Logger LOG = Logger.getLogger(TestBolt.class);
+	
+	private static Jedis jedis;
+	
+	private String channel;
+	
+	public TestBolt(String channel){
+		this.channel = channel;
+	}
+	
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    	jedis = new Jedis("localhost", Integer.parseInt(LogTopology.DEFAULT_JEDIS_PORT));
+    	jedis.connect();
+    }
+    
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+	@Override
+	public void execute(Tuple input) {
+		List objects = input.getValues();
+		objects.add(0, input.getSourceComponent());
+		jedis.rpush(channel, JSONArray.toJSONString(objects));
+		
+	}
+    
+}

File src/test/java/storm/cookbook/log/TestLogEntry.java

+package storm.cookbook.log;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.junit.Test;
+
+import storm.cookbook.log.model.LogEntry;
+
+public class TestLogEntry {
+
+	@Test
+	public void testFromJSON() throws IOException, Exception {
+		SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+		Date test = format.parse("2013-01-04T11:09:16.171");
+		String testData = UnitTestUtils.readFile("/testData1.json");
+		JSONObject obj=(JSONObject) JSONValue.parse(testData);
+		LogEntry entry = new LogEntry(obj);
+		assertEquals("file://logServer/var/log/auth.log", entry.getSource());
+		assertEquals(0,entry.getTags().size());
+		assertEquals(0,entry.getFields().size());
+		assertEquals(test, entry.getTimestamp());
+		assertEquals("logServer",entry.getSourceHost());
+		assertEquals("/var/log/auth.log",entry.getSourcePath());
+		assertNotNull(entry.getMessage());
+		assertEquals("syslog", entry.getType());
+	}
+
+}

File src/test/java/storm/cookbook/log/TestSyslogRules.java

+package storm.cookbook.log;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.drools.KnowledgeBase;
+import org.drools.KnowledgeBaseConfiguration;
+import org.drools.KnowledgeBaseFactory;
+import org.drools.builder.KnowledgeBuilder;
+import org.drools.builder.KnowledgeBuilderFactory;
+import org.drools.builder.ResourceType;
+import org.drools.io.ResourceFactory;
+import org.drools.runtime.StatelessKnowledgeSession;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import storm.cookbook.log.model.LogEntry;
+
+public class TestSyslogRules {
+	
+	private StatelessKnowledgeSession ksession;
+	
+	@Before
+	public void setupDrools(){
+		KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+		kbuilder.add( ResourceFactory.newClassPathResource( "/Syslog.drl", 
+		              getClass() ), ResourceType.DRL );
+		if ( kbuilder.hasErrors() ) {
+		    fail(kbuilder.getErrors().toString());
+		}
+		KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
+		
+		kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );	
+		ksession = kbase.newStatelessKnowledgeSession();
+	}
+	
+	private LogEntry getEntry() throws IOException{
+		String testData = UnitTestUtils.readFile("/testData1.json");
+		JSONObject obj=(JSONObject) JSONValue.parse(testData);
+		LogEntry entry = new LogEntry(obj);
+		return entry;
+	}
+
+	@Test
+	public void testHostCorrection() throws IOException {
+		LogEntry entry = getEntry();
+		entry.setSourceHost("localhost");
+		ksession.execute( entry );
+		assertEquals("localhost.example.com",entry.getSourceHost());
+	}
+	
+	@Test
+	public void testNonHostCorrection() throws IOException {
+		LogEntry entry = getEntry();
+		ksession.execute( entry );
+		assertEquals("logServer",entry.getSourceHost());
+	}
+	
+	@Test
+	public void testFilter() throws IOException {
+		LogEntry entry = getEntry();
+		entry.setType("other");
+		ksession.execute( entry );
+		assertEquals(true,entry.isFilter());
+	}
+	
+	@Test
+	public void testDontFilter() throws IOException {
+		LogEntry entry = getEntry();
+		ksession.execute( entry );
+		assertEquals(false,entry.isFilter());
+	}
+
+}

File src/test/java/storm/cookbook/log/UnitTestUtils.java

+package storm.cookbook.log;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+public class UnitTestUtils {
+	
+	public static String readFile(String file) throws IOException {
+		BufferedReader reader = new BufferedReader(new InputStreamReader(
+				IntegrationTestTopology.class.getResourceAsStream(file)));
+		String line = null;
+		StringBuilder stringBuilder = new StringBuilder();
+		String ls = System.getProperty("line.separator");
+
+		while ((line = reader.readLine()) != null) {
+			stringBuilder.append(line);
+			stringBuilder.append(ls);
+		}
+
+		return stringBuilder.toString();
+	}
+
+}