Commits

György Kohut committed 5c5d5e7

Implement publish

Comments (0)

Files changed (4)

 == Using the examles ==
  1. java -cp target/dependency/*:target/classes org.honeynet.hpfeeds.ExampleConsumer -h <host> -p <port> -i <ident> -s <secret> -c <channel>[,channel2,channel3 ...]
  2. java -cp target/dependency/*:target/classes org.honeynet.hpfeeds.ExampleCsvConsumer -h <host> -p <port> -i <ident> -s <secret> -c <channel>
+ 3. java -cp target/dependency/*:target/classes org.honeynet.hpfeeds.MD5Consumer -h <host> -p <port> -i <ident> -s <secret> -c <channel>[,channel2,channel3 ...]
+ 4. java -cp target/dependency/*:target/classes org.honeynet.hpfeeds.FilePublisher -h <host> -p <port> -i <ident> -s <secret> -c <channel> -f <file>
 

src/main/java/org/honeynet/hpfeeds/FilePublisher.java

+package org.honeynet.hpfeeds;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.charset.CharacterCodingException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.honeynet.hpfeeds.Hpfeeds.EOSException;
+import org.honeynet.hpfeeds.Hpfeeds.InvalidStateException;
+import org.honeynet.hpfeeds.Hpfeeds.LargeMessageException;
+import org.honeynet.hpfeeds.Hpfeeds.ReadTimeOutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FilePublisher {
+	private final static int MAX_FILE_SIZE = 10 * 1024*1024;
+	
+	private static Logger log = LoggerFactory.getLogger(FilePublisher.class);
+	
+	
+	public static void printUsage() {
+		System.err.println("usage:  -h <host> -p <port> -i <ident> -s <secret> -c <channel> -f <file>");
+	}
+	
+	public static void main(String args[]) throws IOException, EOSException, ReadTimeOutException, LargeMessageException, InvalidStateException {
+		Map<String,String> argMap = new HashMap<String,String>();
+		argMap.put("-h", null);
+		argMap.put("-p", null);
+		argMap.put("-i", null);
+		argMap.put("-s", null);
+		argMap.put("-c", null);
+		argMap.put("-f", null);
+		
+		String arg = null;
+		for (String a : args) {
+			if (arg == null) {
+				if (argMap.containsKey(a)) {
+					arg = a;
+					continue;
+				}
+				else {
+					System.err.println("invalid argument: " + a);
+					printUsage();
+					System.exit(1);
+				}
+			} else {
+				argMap.put(arg, a);
+				arg = null;
+			}
+		}
+		
+		if (argMap.containsValue(null)) {
+			System.err.println("missing argument(s)");
+			printUsage();
+			System.exit(1);
+		}
+		
+		
+		String host = argMap.get("-h");
+		int port = Integer.parseInt(argMap.get("-p"));
+		String ident = argMap.get("-i");
+		String secret = argMap.get("-s");
+		String channel = argMap.get("-c");
+		String file = argMap.get("-f");
+		
+		
+		File f = new File(file);
+		long fs = f.length();
+		if (fs == 0) {
+			System.err.println("file does not exist or has a size of 0");
+			System.exit(1);
+		}
+		if (fs > MAX_FILE_SIZE) {
+			System.err.println("file too large, limit: " + MAX_FILE_SIZE + " bytes");
+			System.exit(1);
+		}		
+		FileChannel fc = new FileInputStream(f).getChannel();
+		
+		
+		final Hpfeeds h = new Hpfeeds(host, port, ident, secret);
+		h.connect();
+		
+		Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					h.run(new DummyMessageHandler(), new ExampleErrorHandler());
+				}
+				catch (IOException e) {
+					throw new RuntimeException(e);
+				}
+				catch (EOSException e) {
+					throw new RuntimeException(e);
+				}
+				catch (ReadTimeOutException e) {
+					throw new RuntimeException(e);
+				}
+				catch (LargeMessageException e) {
+					throw new RuntimeException(e);
+				}
+				catch (InvalidStateException e) {
+					throw new RuntimeException(e);
+				}
+			}
+		});
+		t.setDaemon(true);
+		t.start();
+		
+		ByteBuffer buf = fc.map(MapMode.READ_ONLY, 0, fs);
+		h.publish(new String[]{channel}, buf);
+		
+		h.stop();
+		h.disconnect();
+		System.exit(0);
+	}
+	
+	
+	public static class DummyMessageHandler implements Hpfeeds.MessageHandler {
+		@Override
+		public void onMessage(String ident, String channel, ByteBuffer msg) {
+			try {
+				Thread.sleep(100);
+			}
+			catch (InterruptedException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+	
+	
+	public static class ExampleErrorHandler implements Hpfeeds.ErrorHandler {
+		@Override
+		public void onError(ByteBuffer msg) {
+			try {
+				log.error("error message from broker: {}", Hpfeeds.decodeString(msg));
+				System.exit(1);
+			}
+			catch (CharacterCodingException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+}

src/main/java/org/honeynet/hpfeeds/Hpfeeds.java

 		}
 	}
 	
+	public void publish(String[] channels, ByteBuffer content) throws IOException, InvalidStateException, LargeMessageException {
+		if (!sc.isConnected()) throw new InvalidStateException("not connected");
+		for (String c : channels) {
+			sc.write(msgpublish(ident, c, content));
+		}
+	}
+	
 	public void stop() {
 		run = false;
 	}
 		return msghdr(OP_AUTH, msg);
 	}
 	
+	public static ByteBuffer msgpublish(String ident, String channel, ByteBuffer content) throws LargeMessageException {
+		ByteBuffer identBuf = encodeString(ident);
+		byte identLen  = (byte) identBuf.remaining();
+		ByteBuffer chanBuf = encodeString(channel);
+		byte chanLen  = (byte) chanBuf.remaining();
+		int msgLen = 1 + identLen + 1 + chanLen + content.remaining();
+		if (msgLen > MAX_MESSAGE_SIZE) throw new LargeMessageException();
+		ByteBuffer buf = ByteBuffer.allocate(msgLen);
+		buf.put(identLen);
+		buf.put(identBuf);
+		buf.put(chanLen);
+		buf.put(chanBuf);
+		buf.put(content);
+		buf.flip();
+		return msghdr(OP_PUBLISH, buf);
+	}
+	
 	
 	public static ByteBuffer encodeString(String s) {
 		try {

src/main/java/org/honeynet/hpfeeds/MD5Consumer.java

+package org.honeynet.hpfeeds;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.honeynet.hpfeeds.Hpfeeds.EOSException;
+import org.honeynet.hpfeeds.Hpfeeds.InvalidStateException;
+import org.honeynet.hpfeeds.Hpfeeds.LargeMessageException;
+import org.honeynet.hpfeeds.Hpfeeds.ReadTimeOutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MD5Consumer {
+	
+	private static Logger log = LoggerFactory.getLogger(MD5Consumer.class);
+	
+	
+	public static void printUsage() {
+		System.err.println("usage:  -h <host> -p <port> -i <ident> -s <secret> -c <channel>[,channel2,channel3 ...]");
+	}
+	
+	public static void main(String args[]) throws IOException, ReadTimeOutException, EOSException, LargeMessageException, InvalidStateException {
+		Map<String,String> argMap = new HashMap<String,String>();
+		argMap.put("-h", null);
+		argMap.put("-p", null);
+		argMap.put("-i", null);
+		argMap.put("-s", null);
+		argMap.put("-c", null);
+		
+		String arg = null;
+		for (String a : args) {
+			if (arg == null) {
+				if (argMap.containsKey(a)) {
+					arg = a;
+					continue;
+				}
+				else {
+					System.err.println("invalid argument: " + a);
+					printUsage();
+					System.exit(1);
+				}
+			} else {
+				argMap.put(arg, a);
+				arg = null;
+			}
+		}
+		
+		if (argMap.containsValue(null)) {
+			System.err.println("missing argument(s)");
+			printUsage();
+			System.exit(1);
+		}
+		
+		
+		String host = argMap.get("-h");
+		int port = Integer.parseInt(argMap.get("-p"));
+		String ident = argMap.get("-i");
+		String secret = argMap.get("-s");
+		String[] channels = argMap.get("-c").split(",");
+		
+		
+		Hpfeeds h = new Hpfeeds(host, port, ident, secret);
+		h.connect();
+		h.subscribe(channels);
+		h.run(new MD5Handler(), new ExampleErrorHandler());
+		
+	}
+	
+	
+	public static class MD5Handler implements Hpfeeds.MessageHandler {
+		private MessageDigest md;
+		
+		public MD5Handler() {
+			try {
+				md = MessageDigest.getInstance("MD5");
+			}
+			catch (NoSuchAlgorithmException e) {
+				throw new RuntimeException(e);
+			}
+		}
+		
+		@Override
+		public void onMessage(String ident, String chan, ByteBuffer msg) {
+			int len = msg.remaining();
+			md.reset();
+			md.update(msg);	
+			
+			String md5 = new BigInteger(1, md.digest()).toString(16);
+			// pad
+			if (md5.length() < 32) {
+				StringBuilder sb = new StringBuilder(32);
+				int i = 32 - md5.length();
+				while (i > 0) {
+					sb.append("0");
+					i--;
+				}
+				sb.append(md5);
+				md5 = sb.toString();
+			}
+			log.info("publish to {} by {}: length={} md5={}", new Object[]{chan, ident, len, md5});
+		}
+	}
+	
+	
+	public static class ExampleErrorHandler implements Hpfeeds.ErrorHandler {
+		@Override
+		public void onError(ByteBuffer msg) {
+			try {
+				log.error("error message from broker: {}", Hpfeeds.decodeString(msg));
+				System.exit(1);
+			}
+			catch (CharacterCodingException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+}