Commits

Boris FELD committed 5ce1578

Simplify publish message format

Comments (0)

Files changed (2)

src/zeromq/src/main/java/org/janusproject/kernel/network/zeromq/zeromq/ZeroMQNetworkAdapter.java

 	@Override
 	public void informLocalRoleTaken(GroupAddress groupAddress,
 			Class<? extends Role> role, AgentAddress agentAddress) {
-		this.node.logger.info("Agent " + agentAddress.getUUID() + " has take "
+		this.node.logger.info("Agent " + agentAddress + "/" + agentAddress.getUUID() + " has take "
 				+ role.getName());
 		this.node.sub_socket.subscribe(agentAddress.getUUID().toString().getBytes());
 
 
 	@Override
 	public Address sendMessage(Message message) {
-		// TODO Auto-generated method stub
-		System.out.println("Here ? sendMessage");
+		this.logger.info("Send message: " + message);
+		this.node.publish(message.getReceiver().getUUID().toString(), "message",
+				SerializationUtil.encode(message).getBytes());
 		return null;
 	}
 
 	@Override
 	public void broadcastMessage(Message message) {
-		System.out.println("Here ? Broadact message");
+		System.out.println("Here ? Broadcast message");
 		// TODO Auto-generated method stub
 
 	}
 		data.put("leaveConditions", leaveConditions);
 		data.put("membership", SerializationUtil.encode(membership));
 		data.put("persistent", Boolean.valueOf(this.janusProperties.getProperty(JanusProperty.GROUP_PERSISTENCE)));
-		this.node.publish("localGroupCreated", data);
+		this.node.publish("application", "localGroupCreated", data);
 		this.node.sub_socket.subscribe(ga.getUUID().toString().getBytes());
 		return StatusFactory.ok("ok");
 	}

src/zeromq/src/main/java/org/janusproject/kernel/network/zeromq/zeromq/ZeroMQNode.java

 	}
 
 	private void subscribeToAdministrativesTopic() {
-		this.sub_socket.subscribe("localGroupCreated".getBytes());
+		this.sub_socket.subscribe("application".getBytes());
 	}
 
 	private Map<String, Object> register_infos() {
 
 		// Sub socket
 		if (this.poller.pollin(this.sub_pollin_id)) {
+			String dest = new String(this.sub_socket.recv(0));
 			String message_type = new String(this.sub_socket.recv(0));
 			message = this.sub_socket.recv(0);
-			this.process_sub_message(message_type, fromBytes(message));
+			this.process_sub_message(dest, message_type, fromBytes(message));
 		}
 		// Server socket
 		if (this.poller.pollin(this.server_pollin_id)) {
 	}
 
 	@SuppressWarnings("unchecked")
-	private void process_sub_message(String message_type,
+	private void process_sub_message(String dest, String message_type,
 			Map<String, Object> data) {
-		this.logger.info("Process sub message " + message_type + "/" + data);
+		this.logger.info("Process sub message to  " + dest + " " + message_type + "/" + data);
 		if (message_type.equals("localGroupCreated")) {
 			String organizationClass = (String) data.get("organization");
 			UUID uuid = UUID.fromString((String) data.get("groupId"));
 			}
 		}
 	}
-
-	public void publish(String message_type, Map<String, Object> data) {
-		this.logger.info(String.format("Publish %s with data %s", message_type,
-				fromMap(data)));
+	
+	public void publish(String dest, String message_type, byte[] data) {
+		this.logger.info(String.format("Publish %s with data %s to %s", message_type,
+				new String(data), dest));
+		this.pub_socket.send(dest.getBytes(), ZMQ.SNDMORE);
 		this.pub_socket.send(message_type.getBytes(), ZMQ.SNDMORE);
-		this.pub_socket.send(fromMap(data).getBytes(), 0);
+		this.pub_socket.send(data, 0);
+	}
+
+	public void publish(String dest, String message_type, Map<String, Object> data) {
+		this.publish(dest, message_type, fromMap(data).getBytes());
 	}
 
 	private String fromMap(Map<String, Object> m) {