Commits

György Kohut committed 3ebb0d5

Add message and byte counters

  • Participants
  • Parent commits 6a0e54a

Comments (0)

Files changed (1)

File src/main/java/org/honeynet/hpfeeds/Hpfeeds.java

 	private Reader reader;
 	private boolean run = false;
 	
+	
+	private long writtenMessages = 0;
+	private long writtenBytes = 0;
 	private String brokerName;
 	
 	
 		sc = SocketChannel.open();
 		sc.socket().connect(new InetSocketAddress(host, port), TIMEOUT);
 		
+		Reader oldReader = reader; 
 		reader = new Reader(sc);
+		if (oldReader != null) {
+			reader.setReadMessages(oldReader.getReadMessages());
+			reader.setReadBytes(oldReader.getReadBytes());
+		}
 		
 		Message msg = reader.next();
 		if (msg.opcode != OP_INFO) throw new InvalidStateException("expected OP_INFO");
 		
 		brokerName = name;
 		
-		sc.write(msgauth(rand, ident, secret));
+		writtenBytes += sc.write(msgauth(rand, ident, secret));
+		writtenMessages++;
 	}
 	
 	public void subscribe(String[] channels) throws IOException, InvalidStateException {
 		if (!sc.isConnected()) throw new InvalidStateException("not connected");
 		for (String c : channels) {
-			sc.write(msgsubscribe(ident, c));
+			writtenBytes += sc.write(msgsubscribe(ident, c));
+			writtenMessages++;
 		}
 	}
 	
 	public void publish(String[] channels, ByteBuffer content) throws IOException, InvalidStateException, LargeMessageException {
 		if (!sc.isConnected()) throw new InvalidStateException("not connected");
 		for (String c : channels) {
-			sc.write(msgpublish(ident, c, content));
+			writtenBytes += sc.write(msgpublish(ident, c, content));
+			writtenMessages++;
 		}
 	}
 	
 	}
 	
 	
+	public long getReadMessages() {
+		if (reader == null) return 0;
+		return reader.getReadMessages();
+	}
+	
+	public long getReadBytes() {
+		if (reader == null) return 0;
+		return reader.getReadBytes();
+	}
+	
+	public long getWrittenMessages() {
+		return writtenMessages;
+	}
+	
+	public long getWrittenBytes() {
+		return writtenBytes;
+	}
+	
+	
 	public static ByteBuffer msghdr(byte opcode, ByteBuffer content) {
 		int msgLen = content.remaining() + 5;
 		ByteBuffer msg = ByteBuffer.allocate(msgLen);
 		private ReadableByteChannel stream;
 		private int maxMessageSize;
 		
+		private long readMessages = 0;
+		private long readBytes = 0;
 		
 		public Reader(ReadableByteChannel stream) {
 			this(stream, MAX_MESSAGE_SIZE);
 			byte[] content = new byte[len - 5];
 			fetch(ByteBuffer.wrap(content));
 			
+			readMessages++;
 			return new Message(len, opcode, content);
 		}
 
 			while ((r = stream.read(buf)) != -1) {
 				if (r == 0) throw new ReadTimeOutException();
 				n += r;
+				readBytes += r;
 				if (n < s) {
 					continue;
 				}
 			}
 			throw new EOSException();
 		}
+		
+		
+		public long getReadMessages() {
+			return readMessages;
+		}
+		
+		public long getReadBytes() {
+			return readBytes;
+		}
+		
+		public void setReadMessages(long readMessages) {
+			this.readMessages = readMessages;
+		}
+		
+		public void setReadBytes(long readBytes) {
+			this.readBytes = readBytes;
+		}
 	}