Commits

Boris FELD committed 775126f

Use application name in ZeroMQNode

Comments (0)

Files changed (2)

src/zeromq/src/main/java/org/janusproject/kernel/network/zeromq/zeromq/ZeroMQNetworkAdapter.java

 		data.put("membership", SerializationUtil.encode(membership));
 		data.put("persistent", Boolean.valueOf(this.janusProperties
 				.getProperty(JanusProperty.GROUP_PERSISTENCE)));
-		this.node.publish("application", "localGroupCreated", data);
+		this.node.applicationPublish("localGroupCreated", data);
 		this.node.sub_socket.subscribe(ga.getUUID().toString().getBytes());
 		return StatusFactory.ok("ok");
 	}
 	@Override
 	public void setJanusProperties(JanusProperties properties) {
 		this.janusProperties = properties;
+		this.node.setAppName(properties.getProperty(JanusProperty.JANUS_APPLICATION_NAME));
 	}
 
 	// Deprecated functions

src/zeromq/src/main/java/org/janusproject/kernel/network/zeromq/zeromq/ZeroMQNode.java

 
 	private NetworkListener listener;
 
+	private String appName;
+
 	public void init(AgentAddress kernelAddress) {
 
 		// Node infos
 		this.server_pollin_id = this.poller.register(this.server_socket,
 				Poller.POLLIN);
 
-		// Connect to topics
-		this.subscribeToAdministrativesTopic();
-
 		// Peers
 		this.peers = new TreeMap<UUID, ZeroMQPeerNode>();
 	}
 
-	private void subscribeToAdministrativesTopic() {
-		this.sub_socket.subscribe("application".getBytes());
-	}
-
 	private Map<String, Object> register_infos() {
 		Map<String, Object> data = new HashMap<String, Object>();
 		data.put("id", this.id);
 
 	public void register() {
 		System.out.println("Register");
+		if(this.appName != null) {
+			this.sub_socket.subscribe(this.appName.getBytes());
+		}
 		byte[] buf = null;
 		DatagramPacket packet;
 		logger.info("Send " + fromMap(register_infos()));
 		}
 	}
 
-	private void process_server_message(Map<String, Object> data) {
-		// TODO Auto-generated method stub
-
-	}
-
 	@SuppressWarnings("unchecked")
 	private void process_sub_message(String dest, String message_type,
 			byte[] data) {
 		this.pub_socket.send(message_type.getBytes(), ZMQ.SNDMORE);
 		this.pub_socket.send(data, 0);
 	}
+	
+	public void applicationPublish(String message_type, Map<String, Object> data) {
+		this.publish(this.appName, message_type, data);
+	}
 
 	public void publish(String dest, String message_type,
 			Map<String, Object> data) {
 		this.publish(dest, message_type, fromMap(data).getBytes());
 	}
 
+	// Encoding/Decoding utils
 	private String fromMap(Map<String, Object> m) {
 		ObjectMapper mapper = new ObjectMapper();
 		Writer strWriter = new StringWriter();
 					leaveConditions, membership, persistence, groupName);
 		}
 	}
+
+	public void setAppName(String appName) {
+		this.appName = appName;
+	}
 }