Commits

György Kohut committed a970be7

Initial commit

Comments (0)

Files changed (9)

+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+  <classpathentry kind="src" path="src/main/java" including="**/*.java"/>
+  <classpathentry kind="output" path="target/classes"/>
+  <classpathentry kind="src" path="src/test/java" output="target/test-classes" including="**/*.java"/>
+  <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+  <classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-api/1.6.4/slf4j-api-1.6.4.jar"/>
+  <classpathentry kind="var" path="M2_REPO/ch/qos/logback/logback-classic/1.0.3/logback-classic-1.0.3.jar"/>
+  <classpathentry kind="var" path="M2_REPO/ch/qos/logback/logback-core/1.0.3/logback-core-1.0.3.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/fasterxml/jackson/dataformat/jackson-dataformat-csv/2.0.0/jackson-dataformat-csv-2.0.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/fasterxml/jackson/core/jackson-core/2.0.0/jackson-core-2.0.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/fasterxml/jackson/core/jackson-databind/2.0.0/jackson-databind-2.0.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/fasterxml/jackson/core/jackson-annotations/2.0.0/jackson-annotations-2.0.0.jar"/>
+</classpath>
+target/*
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+  <name>hpfeeds-java</name>
+  <comment>NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.</comment>
+  <projects/>
+  <buildSpec>
+    <buildCommand>
+      <name>org.eclipse.jdt.core.javabuilder</name>
+    </buildCommand>
+  </buildSpec>
+  <natures>
+    <nature>org.eclipse.jdt.core.javanature</nature>
+  </natures>
+</projectDescription>

.settings/org.eclipse.jdt.core.prefs

+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.debug.lineNumber=generate
+org.eclipse.jdt.core.compiler.debug.localVariable=generate
+org.eclipse.jdt.core.compiler.debug.sourceFile=generate
+org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
+org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
+org.eclipse.jdt.core.compiler.source=1.6
+== Compiing ==
+ 1. mvn compile
+ 2. mvn dependency:copy-dependencies 
+
+== Using the examles ==
+ 1. java -cp target/dependency/*:target/classes org.honeynet.hpfeeds.ExampleConsumer -h <host> -p <port> -i <ident> -s <secret> -c <channel>[,channel2,channel3 ...]
+ 2. java -cp target/dependency/*:target/classes org.honeynet.hpfeeds.ExampleCsvConsumer -h <host> -p <port> -i <ident> -s <secret> -c <channel>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.honeynet</groupId>
+  <artifactId>hpfeeds-java</artifactId>
+  <version>0.1.0</version>
+  <packaging>jar</packaging>
+
+  <!-- <name>hpfeeds-java</name> -->
+  <!-- <url>?</url> -->
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.3.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.6.4</version>
+    </dependency>
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+      <version>1.0.3</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-csv</artifactId>
+      <version>2.0.0</version>
+    </dependency>
+  </dependencies>
+
+</project>

src/main/java/org/honeynet/hpfeeds/ExampleConsumer.java

+package org.honeynet.hpfeeds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.honeynet.hpfeeds.Hpfeeds.EOSException;
+import org.honeynet.hpfeeds.Hpfeeds.InvalidStateException;
+import org.honeynet.hpfeeds.Hpfeeds.LargeMessageException;
+import org.honeynet.hpfeeds.Hpfeeds.ReadTimeOutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExampleConsumer {
+	
+	private static Logger log = LoggerFactory.getLogger(ExampleConsumer.class);
+	
+	
+	public static void printUsage() {
+		System.err.println("usage:  -h <host> -p <port> -i <ident> -s <secret> -c <channel>[,channel2,channel3 ...]");
+	}
+	
+	public static void main(String args[]) throws IOException, ReadTimeOutException, EOSException, LargeMessageException, InvalidStateException {
+		Map<String,String> argMap = new HashMap<String,String>();
+		argMap.put("-h", null);
+		argMap.put("-p", null);
+		argMap.put("-i", null);
+		argMap.put("-s", null);
+		argMap.put("-c", null);
+		
+		String arg = null;
+		for (String a : args) {
+			if (arg == null) {
+				if (argMap.containsKey(a)) {
+					arg = a;
+					continue;
+				}
+				else {
+					System.err.println("invalid argument: " + a);
+					printUsage();
+					System.exit(1);
+				}
+			} else {
+				argMap.put(arg, a);
+				arg = null;
+			}
+		}
+		
+		if (argMap.containsValue(null)) {
+			System.err.println("missing argument(s)");
+			printUsage();
+			System.exit(1);
+		}
+		
+		
+		String host = argMap.get("-h");
+		int port = Integer.parseInt(argMap.get("-p"));
+		String ident = argMap.get("-i");
+		String secret = argMap.get("-s");
+		String[] channels = argMap.get("-c").split(",");
+		
+		
+		Hpfeeds h = new Hpfeeds(host, port, ident, secret);
+		h.connect();
+		h.subscribe(channels);
+		h.run(new ExampleHandler(), new ExampleErrorHandler());
+		
+	}
+	
+	
+	public static class ExampleHandler implements Hpfeeds.MessageHandler {
+		private Pattern pat = Pattern.compile(".*\\P{Print}");
+		
+		@Override
+		public void onMessage(String ident, String chan, ByteBuffer msg) {
+			try {
+				String out = null;
+				boolean notPrintable = false;
+				
+				ByteBuffer buf = msg.slice();
+				int bufLen = buf.remaining();
+				if (bufLen > 20) {
+					buf.limit(20);
+					bufLen = 20;
+				}
+				
+				try {
+					notPrintable = pat.matcher(Hpfeeds.decodeString(buf)).matches();
+				} catch (CharacterCodingException e) {
+					notPrintable = true;
+				}
+				
+				if (notPrintable) {
+					StringBuilder outBuilder = new StringBuilder(64);
+					buf.rewind();
+					for (int i = bufLen ; i != 0 ; i--) {
+						outBuilder.append(String.format("%02x ", buf.get()));
+					}
+					outBuilder.append("...");
+					out = outBuilder.toString();
+				}
+				else {
+					out = Hpfeeds.decodeString(msg);
+				}
+				
+				log.info("publish to {} by {}: {}", new Object[]{chan, ident, out});
+			}
+			catch (CharacterCodingException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+	
+	
+	public static class ExampleErrorHandler implements Hpfeeds.ErrorHandler {
+		@Override
+		public void onError(ByteBuffer msg) {
+			try {
+				log.error("error message from broker: {}", Hpfeeds.decodeString(msg));
+				System.exit(1);
+			}
+			catch (CharacterCodingException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+}

src/main/java/org/honeynet/hpfeeds/ExampleCsvConsumer.java

+package org.honeynet.hpfeeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.honeynet.hpfeeds.Hpfeeds.EOSException;
+import org.honeynet.hpfeeds.Hpfeeds.InvalidStateException;
+import org.honeynet.hpfeeds.Hpfeeds.LargeMessageException;
+import org.honeynet.hpfeeds.Hpfeeds.ReadTimeOutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+
+public class ExampleCsvConsumer {
+	
+	private static Logger log = LoggerFactory.getLogger(ExampleCsvConsumer.class);
+	
+	
+	public static void printUsage() {
+		System.err.println("usage:  -h <host> -p <port> -i <ident> -s <secret> -c <channel>");
+	}
+	
+	public static void main(String args[]) throws IOException, EOSException, ReadTimeOutException, LargeMessageException, InvalidStateException {
+		Map<String,String> argMap = new HashMap<String,String>();
+		argMap.put("-h", null);
+		argMap.put("-p", null);
+		argMap.put("-i", null);
+		argMap.put("-s", null);
+		argMap.put("-c", null);
+		
+		String arg = null;
+		for (String a : args) {
+			if (arg == null) {
+				if (argMap.containsKey(a)) {
+					arg = a;
+					continue;
+				}
+				else {
+					System.err.println("invalid argument: " + a);
+					printUsage();
+					System.exit(1);
+				}
+			} else {
+				argMap.put(arg, a);
+				arg = null;
+			}
+		}
+		
+		if (argMap.containsValue(null)) {
+			System.err.println("missing argument(s)");
+			printUsage();
+			System.exit(1);
+		}
+		
+		
+		String host = argMap.get("-h");
+		int port = Integer.parseInt(argMap.get("-p"));
+		String ident = argMap.get("-i");
+		String secret = argMap.get("-s");
+		String channel = argMap.get("-c");
+		
+		
+		Hpfeeds h = new Hpfeeds(host, port, ident, secret);
+		h.connect();
+		h.subscribe(new String[]{channel});
+		h.run(new CsvHandler(), new ExampleErrorHandler());
+		
+	}
+	
+	
+	public static class CsvHandler implements Hpfeeds.MessageHandler {
+		private ObjectMapper jsonObjectMapper = new ObjectMapper();
+		private CsvMapper csvObjectMapper = new CsvMapper();
+		CsvSchema csvShema = csvObjectMapper.schemaFor(Attack.class);
+		
+		private static class Attack {
+			public long timestamp;
+			public String ident, chan;
+			public String daddr, dport, saddr, sport, md5, sha512, url;
+			
+			@Override
+			public String toString() {
+				return String.format("%d %s %s %s %s %s %s %s %s %s", timestamp, ident, chan, daddr, dport, saddr, sport, md5, sha512, url);
+			}
+		}
+		
+		@Override
+		public void onMessage(String ident, String chan, ByteBuffer msg) {
+			long timestamp = new Date().getTime();
+			try {
+				Attack attack = jsonObjectMapper.readValue(new ByteBufferInputStream(msg), Attack.class);
+				attack.timestamp = timestamp;
+				attack.ident = ident;
+				attack.chan = chan;
+				//FIXME double and triple commas?
+				String csv = csvObjectMapper.writer().withSchema(csvShema).writeValueAsString(attack);
+				System.out.print(csv);
+			}
+			catch (IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+
+	
+	public static class ByteBufferInputStream extends InputStream {
+		private ByteBuffer buf;
+		
+		public ByteBufferInputStream(ByteBuffer buf) {
+			this.buf = buf;
+		}
+
+		@Override
+		public int read() throws IOException {
+			try {
+				return buf.get();
+			}
+			catch (BufferUnderflowException e) {
+				return -1;
+			}
+		}
+	}
+	
+	
+	public static class ExampleErrorHandler implements Hpfeeds.ErrorHandler {
+		@Override
+		public void onError(ByteBuffer msg) {
+			try {
+				log.error("error message from broker: {}", Hpfeeds.decodeString(msg));
+				System.exit(1);
+			}
+			catch (CharacterCodingException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+}

src/main/java/org/honeynet/hpfeeds/Hpfeeds.java

+package org.honeynet.hpfeeds;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Hpfeeds {
+	private final static int MAX_MESSAGE_SIZE = 16 * 1024*1024;
+	private final static int TIMEOUT = 3000;
+	
+	public final static byte OP_ERROR = 0;
+	public final static byte OP_INFO = 1;
+	public final static byte OP_AUTH = 2;
+	public final static byte OP_PUBLISH = 3;
+	public final static byte OP_SUBSCRIBE = 4;
+	
+	
+	private static Charset charset = Charset.forName("UTF-8");
+	private static CharsetEncoder charsetEncoder = charset.newEncoder();
+	private static CharsetDecoder charsetDecoder = charset.newDecoder();
+	
+
+	private String host, ident, secret;
+	private int port;
+	
+	private SocketChannel sc;
+	private Reader reader;
+	private boolean run = false;
+	
+	private String brokerName;
+	
+	
+	private static Logger log = LoggerFactory.getLogger(Hpfeeds.class);
+	
+	
+	
+	
+	public static interface MessageHandler {
+		void onMessage(String ident, String channel, ByteBuffer msg);
+	}
+	
+	public static interface ErrorHandler {
+		void onError(ByteBuffer msg);
+	}
+	
+	
+	public static class InvalidStateException extends Exception {
+		public InvalidStateException() {
+			super();
+		}
+		public InvalidStateException(String string) {
+			super(string);
+		}
+	}
+	public static class EOSException extends Exception {}
+	public static class ReadTimeOutException extends Exception {}
+	public static class LargeMessageException extends Exception {}
+	
+	
+	
+	
+	public Hpfeeds(String host, int port, String ident, String secret) {
+		this.host = host;
+		this.port = port;
+		this.ident = ident;
+		this.secret = secret;
+	}
+	
+	
+	public void connect() throws IOException, EOSException, ReadTimeOutException, LargeMessageException, InvalidStateException {
+		sc = SocketChannel.open();
+		sc.socket().connect(new InetSocketAddress(host, port), TIMEOUT);
+		
+		reader = new Reader(sc);
+		
+		Message msg = reader.next();
+		if (msg.opcode != OP_INFO) throw new InvalidStateException("expected OP_INFO");
+		
+		String name;
+		ByteBuffer rand;
+		ByteBuffer buf = ByteBuffer.wrap(msg.content);
+		int nameLen = buf.get();
+		ByteBuffer nameBuf = buf.slice();
+		nameBuf.limit(nameLen);
+		name = decodeString(nameBuf);
+		buf.position(1 + nameLen);
+		rand = buf.slice();
+		
+		brokerName = name;
+		
+		sc.write(msgauth(rand, ident, secret));
+	}
+	
+	public void subscribe(String[] channels) throws IOException, InvalidStateException {
+		if (!sc.isConnected()) throw new InvalidStateException("not connected");
+		for (String c : channels) {
+			sc.write(msgsubscribe(ident, c));
+		}
+	}
+	
+	public void run(MessageHandler messageHandler, ErrorHandler errorHandler) throws IOException, EOSException, ReadTimeOutException, LargeMessageException, InvalidStateException {
+		if (!sc.isConnected()) throw new InvalidStateException("not connected");
+		
+		run = true;
+		while (run) {
+			Message msg;
+			try {
+				msg = reader.next();
+			}
+			catch (IOException e) {
+				if (!run) return;
+				else throw e;
+			}
+			catch (EOSException e) {
+				if (!run) return;
+				else throw e;
+			}
+			catch (ReadTimeOutException e) {
+				if (!run) return;
+				else throw e;
+			}
+			catch (LargeMessageException e) {
+				if (!run) return;
+				else throw e;
+			}
+			
+			if (msg.opcode == OP_PUBLISH) {
+				String ident, chan;
+				ByteBuffer buf = ByteBuffer.wrap(msg.content);
+				
+				int identLen = buf.get();
+				ByteBuffer identBuf = buf.slice();
+				identBuf.limit(identLen);
+				ident = decodeString(identBuf);
+				buf.position(1 + identLen);
+				buf = buf.slice();
+				int chanLen = buf.get();
+				ByteBuffer chanBuf = buf.slice();
+				chanBuf.limit(chanLen);
+				chan = decodeString(chanBuf);
+				buf.position(1 + chanLen);
+				buf = buf.slice();
+				
+				messageHandler.onMessage(ident, chan, buf);
+			}
+			else if (msg.opcode == OP_ERROR) {
+				errorHandler.onError(ByteBuffer.wrap(msg.content));
+			}
+			else {
+				throw new InvalidStateException("expected OP_PUBLISH or OP_ERROR");
+			}
+		}
+	}
+	
+	public void stop() {
+		run = false;
+	}
+	
+	public void disconnect() throws IOException {
+		sc.close();
+	}
+	
+	
+	public static ByteBuffer msghdr(byte opcode, ByteBuffer content) {
+		int msgLen = content.remaining() + 5;
+		ByteBuffer msg = ByteBuffer.allocate(msgLen);
+		msg.putInt(msgLen);
+		msg.put(opcode);
+		msg.put(content);
+		msg.flip();
+		return msg;
+	}
+	
+	public static ByteBuffer msgsubscribe(String ident, String channel) {
+		byte identLen  = (byte) encodeString(ident).remaining();
+		ByteBuffer buf = encodeString(ident + channel);
+		ByteBuffer msg = ByteBuffer.allocate(1 + buf.remaining());
+		msg.put(identLen);
+		msg.put(buf);
+		msg.flip();
+		return msghdr(OP_SUBSCRIBE, msg);
+	}
+	
+	public static ByteBuffer msgauth(ByteBuffer rand, String ident, String secret) {
+		ByteBuffer identBuf = encodeString(ident);
+		byte identLen  = (byte) identBuf.remaining();
+		MessageDigest md;
+		try {
+			md = MessageDigest.getInstance("SHA-1");
+		}
+		catch (NoSuchAlgorithmException e) {
+			throw new RuntimeException(e);
+		}
+		md.update(rand);
+		md.update(encodeString(secret));
+		ByteBuffer hashBuf = ByteBuffer.wrap(md.digest());
+		ByteBuffer msg = ByteBuffer.allocate(1 + identLen + hashBuf.remaining());
+		msg.put(identLen);
+		msg.put(identBuf);
+		msg.put(hashBuf);
+		msg.flip();
+		return msghdr(OP_AUTH, msg);
+	}
+	
+	
+	public static ByteBuffer encodeString(String s) {
+		try {
+			return charsetEncoder.encode(CharBuffer.wrap(s));
+		}
+		catch (CharacterCodingException e) {
+			throw new RuntimeException(e);
+		}
+	}
+	
+	public static String decodeString(ByteBuffer b) throws CharacterCodingException {
+		return charsetDecoder.decode(b.duplicate()).toString();
+	}
+	
+	
+	
+	
+	public static class Reader {
+		private ReadableByteChannel stream;
+		private int maxMessageSize;
+		
+		
+		public Reader(ReadableByteChannel stream) {
+			this(stream, MAX_MESSAGE_SIZE);
+		}
+		
+		public Reader(ReadableByteChannel stream, int maxMessageSize) {
+			this.stream = stream;
+			this.maxMessageSize = maxMessageSize;
+		}
+		
+		public Message next() throws IOException, EOSException, ReadTimeOutException, LargeMessageException {
+			ByteBuffer headerBuf = ByteBuffer.allocate(5);			
+			fetch(headerBuf);
+			headerBuf.flip();
+			int len = headerBuf.getInt();
+			byte opcode = headerBuf.get();
+			
+			if (len > maxMessageSize) throw new LargeMessageException();
+			
+			byte[] content = new byte[len - 5];
+			fetch(ByteBuffer.wrap(content));
+			
+			return new Message(len, opcode, content);
+		}
+
+		private void fetch(ByteBuffer buf) throws IOException, EOSException, ReadTimeOutException {
+			int s = buf.remaining();
+			int n = 0, r;
+			
+			while ((r = stream.read(buf)) != -1) {
+				if (r == 0) throw new ReadTimeOutException();
+				n += r;
+				if (n < s) {
+					continue;
+				}
+				else {
+					return;
+				}
+			}
+			throw new EOSException();
+		}
+	}
+	
+	
+	public static class Message {
+		public int len;
+		public byte opcode;
+		public byte[] content;
+		
+		public Message(int len, byte opcode, byte[] content) {
+			this.len = len;
+			this.opcode = opcode;
+			this.content = content;
+		}
+	}
+	
+}