Commits

Boris FELD committed 5962109

Be sure to not send group creating messages indefinitely

Comments (0)

Files changed (2)

src/zeromq/src/main/java/org/janusproject/kernel/network/zeromq/agent/ZeroMQKernelAgent.java

  */
 package org.janusproject.kernel.network.zeromq.agent;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EventListener;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import org.arakhne.vmutil.locale.Locale;
 import org.janusproject.kernel.address.Address;
 import org.janusproject.kernel.address.AgentAddress;
-import org.janusproject.kernel.agent.Agent;
 import org.janusproject.kernel.agent.AgentActivator;
 import org.janusproject.kernel.agent.KernelAgent;
 import org.janusproject.kernel.configuration.JanusProperties;
 import org.janusproject.kernel.configuration.JanusProperty;
+import org.janusproject.kernel.crio.capacity.Capacity;
 import org.janusproject.kernel.crio.core.GroupAddress;
 import org.janusproject.kernel.crio.core.Organization;
 import org.janusproject.kernel.crio.core.Role;
 import org.janusproject.kernel.repository.RepositoryChangeEvent;
 import org.janusproject.kernel.repository.RepositoryChangeEvent.ChangeType;
 import org.janusproject.kernel.repository.RepositoryChangeListener;
+import org.janusproject.kernel.repository.RepositoryOverlooker;
 import org.janusproject.kernel.status.Status;
 import org.janusproject.kernel.status.StatusFactory;
 import org.janusproject.kernel.util.throwable.Throwables;
  * Agent that represents and run the kernel of the Janus platform.
  * <p>
  * If the kernel agent is suicidable, it means that it will stop its execution
- * if no more other agent exists. If the kernel agent is not suicidable, it
- * will persist even if no more other agent is registered.
+ * if no more other agent exists. If the kernel agent is not suicidable, it will
+ * persist even if no more other agent is registered.
  * <p>
  * This kernel agent supports networking.
  * 
  * @mavengroupid $GroupId$
  * @mavenartifactid $ArtifactId$
  */
-public class ZeroMQKernelAgent extends KernelAgent implements RepositoryChangeListener, NetworkListener {
+public class ZeroMQKernelAgent extends KernelAgent implements
+		RepositoryChangeListener, NetworkListener {
 
 	private static final long serialVersionUID = -8244141754953009460L;
 
 
 	private final NetworkAdapter adapter;
 	private ZeroMQNode node;
+	// XXX: Hack
+	private Set<UUID> existingGroups;
 
 	/**
 	 * Create a kernel agent with the default settings.
 	 *            is a listener on kernel events which may be added at startup.
 	 * @param applicationName
 	 *            is the name of the application supported by this kernel.
-	 * @param node 
+	 * @param node
 	 * @param networkAdapter
-	 *            is the adapter used by this kernel to be connected through a network.
+	 *            is the adapter used by this kernel to be connected through a
+	 *            network.
 	 */
-	ZeroMQKernelAgent(AgentActivator activator, Boolean commitSuicide, EventListener startUpListener, String applicationName, ZeroMQNode node, NetworkAdapter networkAdapter) {
-		super(activator, commitSuicide, null, startUpListener, networkAdapter, applicationName);
+	ZeroMQKernelAgent(AgentActivator activator, Boolean commitSuicide,
+			EventListener startUpListener, String applicationName,
+			ZeroMQNode node, NetworkAdapter networkAdapter) {
+		super(activator, commitSuicide, null, startUpListener, networkAdapter,
+				applicationName);
 		getAddress().setName(Locale.getString(ZeroMQKernelAgent.class, "NAME")); //$NON-NLS-1$
 
 		this.node = node;
-		
+
 		this.adapter = networkAdapter;
 		this.adapter.setNetworkAdapterListener(this);
 
 		this.adapter.setJanusProperties(prop);
 
 		try {
-			this.adapter.initializeNetwork(getKernelContext().getKernelAgent(), getKernelContext().getProperties());
-		}
-		catch (AssertionError ae) {
+			this.adapter.initializeNetwork(getKernelContext().getKernelAgent(),
+					getKernelContext().getProperties());
+		} catch (AssertionError ae) {
 			throw ae;
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			throw new IllegalStateException(e);
 		}
 		getGroupRepository().addRepositoryChangeListener(this);
 		getAgentRepository().addRepositoryChangeListener(this);
+
+		// XXX: Hack
+		this.existingGroups = new HashSet<UUID>();
+
 		submitTaskWithFixedDelay(new GroupCleaner(),
-				EMPTY_GROUP_CLEANING_DELAY*60,
-				EMPTY_GROUP_CLEANING_DELAY*60,
-				TimeUnit.SECONDS);
+				EMPTY_GROUP_CLEANING_DELAY * 60,
+				EMPTY_GROUP_CLEANING_DELAY * 60, TimeUnit.SECONDS);
 	}
-	
+
 	public Status live() {
 		if (this.node != null) {
 			this.node.run();
 		if (evt.getType() == ChangeType.ADD) {
 			if (evt.getChangedObject() instanceof GroupAddress) {
 				try {
-					GroupAddress group = (GroupAddress) evt.getChangedObject();					
+					GroupAddress group = (GroupAddress) evt.getChangedObject();
 					Group groupDescription = getGroupObject(group);
-					this.adapter.informLocalGroupCreated(group,groupDescription.getObtainConditions(),groupDescription.getLeaveConditions(),groupDescription.getMembership());
-				}
-				catch (AssertionError ae) {
+					this.adapter.informLocalGroupCreated(group,
+							groupDescription.getObtainConditions(),
+							groupDescription.getLeaveConditions(),
+							groupDescription.getMembership());
+				} catch (AssertionError ae) {
 					throw ae;
-				}
-				catch (Exception e) {
+				} catch (Exception e) {
 					logger.fine(Throwables.toString(e));
 				}
 			} else if (evt.getChangedObject() instanceof AgentAddress) {
 				try {
 					AgentAddress aagent = (AgentAddress) evt.getChangedObject();
-					Agent agent = this.getAgentRepository().get(aagent);
-					this.adapter.informLocalAgent(aagent, agent);
-				}
-				catch (AssertionError ae) {
+					this.adapter.informLocalAgent(aagent);
+				} catch (AssertionError ae) {
 					throw ae;
-				}
-				catch (Exception e) {
+				} catch (Exception e) {
 					logger.fine(Throwables.toString(e));
 				}
 			}
-		}
-		else if (evt.getType() == ChangeType.REMOVE) {
+		} else if (evt.getType() == ChangeType.REMOVE) {
 			if (evt.getChangedObject() instanceof GroupAddress) {
 				try {
-					GroupAddress group = (GroupAddress) evt.getChangedObject();					
+					GroupAddress group = (GroupAddress) evt.getChangedObject();
 					this.adapter.informLocalGroupRemoved(group);
-				}
-				catch (AssertionError ae) {
+				} catch (AssertionError ae) {
 					throw ae;
-				}
-				catch (Exception e) {
+				} catch (Exception e) {
 					logger.fine(Throwables.toString(e));
 				}
 			} else if (evt.getChangedObject() instanceof AgentAddress) {
 				try {
-					AgentAddress agent = (AgentAddress) evt.getChangedObject();					
+					AgentAddress agent = (AgentAddress) evt.getChangedObject();
 					this.adapter.informLocalAgentRemoved(agent);
-				}
-				catch (AssertionError ae) {
+				} catch (AssertionError ae) {
 					throw ae;
-				}
-				catch (Exception e) {
+				} catch (Exception e) {
 					logger.fine(Throwables.toString(e));
 				}
 			}
 	/*
 	 * (non-Javadoc)
 	 * 
-	 * @see org.janusproject.kernel.network.api.NetworkAdapterListener# distantGroupDiscovered(java.lang.Class, java.util.UUID)
+	 * @see org.janusproject.kernel.network.api.NetworkAdapterListener#
+	 * distantGroupDiscovered(java.lang.Class, java.util.UUID)
 	 */
+	@SuppressWarnings("unchecked")
 	@Override
-	public void distantGroupDiscovered(Class<? extends Organization> organization, UUID id, Collection<? extends GroupCondition> obtainConditions, Collection<? extends GroupCondition> leaveConditions, MembershipService membership, boolean persistent, String groupName) {
-		getOrCreateGroup(id, organization, obtainConditions, leaveConditions, membership, true, persistent, groupName);
+	public void distantGroupDiscovered(
+			Class<? extends Organization> organization, UUID id,
+			Collection<? extends GroupCondition> obtainConditions,
+			Collection<? extends GroupCondition> leaveConditions,
+			MembershipService membership, boolean persistent, String groupName) {
+		// Process message
+		getOrCreateGroup(id, organization, obtainConditions, leaveConditions,
+				membership, true, persistent, groupName);
+		// Re-inform all local groups
+		// XXX: Hack
+		if (!this.existingGroups.contains(id)) {
+			RepositoryOverlooker<GroupAddress> groups = this
+					.getGroupRepository();
+			for (GroupAddress group : groups) {
+				this.adapter.informLocalGroupCreated(group,
+						new ArrayList<GroupCondition>(),
+						new ArrayList<GroupCondition>(), membership);
+				System.out.println("Group: " + group);
+			}
+			this.existingGroups.add(id);
+		}
 	}
 
-	/** {@inheritDoc}
+	/**
+	 * {@inheritDoc}
 	 */
 	@Override
-	public RoleAddress receiveOrganizationalDistantMessage(GroupAddress group, Class<? extends Role> receiverRole, Message message, boolean isBroadcast) {
+	public RoleAddress receiveOrganizationalDistantMessage(GroupAddress group,
+			Class<? extends Role> receiverRole, Message message,
+			boolean isBroadcast) {
 		if (isBroadcast) {
 			forwardBroadcastMessage(message);
 			return null;
 		}
 		Address a = forwardMessage(message);
-		assert(a instanceof RoleAddress);
-		return (RoleAddress)a;
+		assert (a instanceof RoleAddress);
+		return (RoleAddress) a;
 	}
-	
+
 	@Override
 	public String toString() {
 		return String.format("ZeroMQKernelAgent [adapter=%s, node=%s]",
 				adapter, node);
 	}
 
-	/** {@inheritDoc}
+	/**
+	 * {@inheritDoc}
 	 */
 	@Override
-	public AgentAddress receiveAgentAgentDistantMessage(Message message, boolean isBroadcast) {
+	public AgentAddress receiveAgentAgentDistantMessage(Message message,
+			boolean isBroadcast) {
 		if (isBroadcast) {
 			forwardBroadcastMessage(message);
 			return null;
 		}
 		Address a = forwardMessage(message);
-		assert(a instanceof AgentAddress);
-		return (AgentAddress)a;
+		assert (a instanceof AgentAddress);
+		return (AgentAddress) a;
 	}
 
-	/** {@inheritDoc}
+	/**
+	 * {@inheritDoc}
 	 */
 	@Override
 	public void networkError(Throwable e) {
 		}
 	}
 
-	/** {@inheritDoc}
+	/**
+	 * {@inheritDoc}
 	 */
 	@Override
 	public void networkLog(String message) {
 		@SuppressWarnings("synthetic-access")
 		public void run() {
 			ZeroMQKernelAgent.this.removeInactivePersistentGroups(
-					120 * EMPTY_GROUP_CLEANING_DELAY,
-					TimeUnit.SECONDS);
+					120 * EMPTY_GROUP_CLEANING_DELAY, TimeUnit.SECONDS);
 		}
 
 	}

src/zeromq/src/main/java/org/janusproject/kernel/network/zeromq/zeromq/ZeroMQNetworkAdapter.java

 	}
 
 	@Override
-	public void informLocalAgent(AgentAddress agentAdress, Agent agent) {
+	public void informLocalAgent(AgentAddress agentAdress) {
 		// TODO Auto-generated method stub
 	}