Commits

Anonymous committed 6258691

Refactored clustering code. JMS clustering support added (for both JMS 1.1 and JMS 1.0).

Comments (0)

Files changed (18)

-OSCACHE 2.0.0
+OSCACHE 2.0 (beta 2)
+-------------
+New features:
+- Now supports JavaGroups version 2.1.
+- JMS Clustering support has been added [Romulus Pasca].
+- Clustering code has been refactored. As a result of this, some of the
+  clustering configuration has changed since beta 1 - please see the updated
+  clustering documentation for details.
+- Performance enhancement: When running under JRE 1.3.x, the LRUCache will
+  now attempt to use the Jakarta commons collections SequencedHashMap. If the
+  commons-collections.jar is not present then the code resorts to using a
+  LinkedList and a warning is logged. Note that under JRE 1.4.x and higher the
+  commons-collections.jar is not required.
+- Config.getProperties() method added.
+
+Bug fixes:
+- [CACHE-45] Fixed a serialization bug.
+- [CACHE-48] FastCronParser no longer requires JDK 1.4.x.
+- The CachewideEvent was not holding the event date.
+- Prevented an error from being logged in the CachewideEvent handling (even
+  though no problem had occurred).
+- Fixed a subtle bug in the concurrent unit test.
+- The ServletCacheAdministrator's app scope cache is created on startup (via
+  the CacheContextListener).
+
+
+OSCACHE 2.0 (beta 1)
 -------------
 New features:
 - [CACHE-11] Cache grouping support. This allows cache entries to be placed
 - New event CacheMapAccessEvent.STALE_HIT. This event is fired when an attempt
   is made to retrieve and entry from the cache, and the entry is found but is
   stale.
-- Clustering support has been added as an event listener. Currently it is
-  implemented using JavaGroups (www.javagroups.com). To enable, just add the
-  BroadcastingCacheEventListener class to the cache.event.listeners property.
+- Clustering support has been added as an event listener. Currently there are two
+  implementations available, one using JMS as the underlying transport, and one
+  using JavaGroups (www.javagroups.com).
 - Now uses Jakarta Commons Logging for all log messages. This means that the
   cache.debug configuration property is now ignored - use whatever logging
   configuration is appropriate for your logging setup instead. [Fabian Crabus]
 - Performance boost: When OSCache is running on JRE 1.4 or higher, LRUCache
   and FIFOCache use a LinkedHashSet instead of a LinkedList.
 - Japloy is now used to ensure source is consistently formatted
-- Test cases now work on non-windows platforms. Also coverage reports added courtesy of clover.
+- Test cases now work on non-windows platforms. Also coverage reports added
+  courtesy of clover.
 
 Changes that may affect backwards compatibility:
 - The cache.entryevent.classes property in the configuration file has been

lib/build/jms.jar

Binary file added.

src/core/etc/oscache.properties

 
 # CACHE LISTENERS
 #
-# cache.event.listeners=com.opensymphony.oscache.plugins.clustersupport.BroadcastingCacheEventListener,  \
-#                       com.opensymphony.oscache.extra.CacheEntryEventListenerImpl,                      \
-#                       com.opensymphony.oscache.extra.CacheMapAccessEventListenerImpl,                  \
+# cache.event.listeners=com.opensymphony.oscache.plugins.clustersupport.JMSBroadcastingListener,  \
+#                       com.opensymphony.oscache.extra.CacheEntryEventListenerImpl,               \
+#                       com.opensymphony.oscache.extra.CacheMapAccessEventListenerImpl,           \
 #                       com.opensymphony.oscache.extra.ScopeEventListenerImpl
 
 
 # cache.unlimited.disk=false
 
 
-# CLUSTER PROPERTIES
+# JMS CLUSTER PROPERTIES
+#
+# Configuration properties for JMS clustering. See the clustering documentation
+# for more information on these settings.
+#
+#cache.cluster.jms.topic.factory=java:comp/env/jms/TopicConnectionFactory
+#cache.cluster.jms.topic.name=java:comp/env/jms/OSCacheTopic
+#cache.cluster.jms.node.name=node1
+
+# JAVAGROUPS CLUSTER PROPERTIES
 #
 # Configuration properites for the JavaGroups clustering. Only one of these
 # should be specified. Default values (as shown below) will be used if niether
-# property is set. See the JavaGroups project (www.javagroups.com) for more
-# information on these settings.
+# property is set. See the clustering documentation and the JavaGroups project
+# (www.javagroups.com) for more information on these settings.
 #
-#cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.STABLE(desired_avg_gossip=20000):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
+#cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):pbcast.STABLE(desired_avg_gossip=20000):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
 #cache.cluster.multicast.ip=231.12.21.132

src/plugins/clustersupport/java/com/opensymphony/oscache/plugins/clustersupport/AbstractBroadcastingListener.java

+/*
+ * Copyright (c) 2002-2003 by OpenSymphony
+ * All rights reserved.
+ */
+package com.opensymphony.oscache.plugins.clustersupport;
+
+import com.opensymphony.oscache.base.*;
+import com.opensymphony.oscache.base.events.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Date;
+
+/**
+ * Implementation of a CacheEntryEventListener. It broadcasts the flush events
+ * across a cluster to other listening caches. Note that this listener cannot
+ * be used in conjection with session caches.
+ *
+ * @version        $Revision$
+ * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
+ */
+public abstract class AbstractBroadcastingListener implements CacheEntryEventListener, LifecycleAware {
+    private final static Log log = LogFactory.getLog(AbstractBroadcastingListener.class);
+
+    /**
+     * The name to use for the origin of cluster events. Using this ensures
+     * events are not fired recursively back over the cluster.
+     */
+    protected static final String CLUSTER_ORIGIN = "CLUSTER";
+    protected Cache cache = null;
+
+    public AbstractBroadcastingListener() {
+        if (log.isInfoEnabled()) {
+            log.info("AbstractBroadcastingListener registered");
+        }
+    }
+
+    /**
+     * Event fired when an entry is flushed from the cache. This broadcasts
+     * the flush message to any listening nodes on the network.
+     */
+    public void cacheEntryFlushed(CacheEntryEvent event) {
+        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) {
+            if (log.isDebugEnabled()) {
+                log.debug("cacheEntryFlushed called (" + event + ")");
+            }
+
+            sendNotification(new ClusterNotification(ClusterNotification.FLUSH_KEY, event.getKey()));
+        }
+    }
+
+    /**
+     * Event fired when an entry is removed from the cache. This broadcasts
+     * the remove method to any listening nodes on the network, as long as
+     * this event wasn't from a broadcast in the first place.
+     */
+    public void cacheGroupFlushed(CacheGroupEvent event) {
+        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) {
+            if (log.isDebugEnabled()) {
+                log.debug("cacheGroupFushed called (" + event + ")");
+            }
+
+            sendNotification(new ClusterNotification(ClusterNotification.FLUSH_GROUP, event.getGroup()));
+        }
+    }
+
+    public void cachePatternFlushed(CachePatternEvent event) {
+        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) {
+            if (log.isDebugEnabled()) {
+                log.debug("cachePatternFushed called (" + event + ")");
+            }
+
+            sendNotification(new ClusterNotification(ClusterNotification.FLUSH_PATTERN, event.getPattern()));
+        }
+    }
+
+    public void cacheFlushed(CachewideEvent event) {
+        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) {
+            if (log.isDebugEnabled()) {
+                log.debug("cacheFushed called (" + event + ")");
+            }
+
+            sendNotification(new ClusterNotification(ClusterNotification.FLUSH_CACHE, event.getDate()));
+        }
+    }
+
+    // --------------------------------------------------------
+    // The remaining events are of no interest to this listener
+    // --------------------------------------------------------
+    public void cacheEntryAdded(CacheEntryEvent event) {
+    }
+
+    public void cacheEntryRemoved(CacheEntryEvent event) {
+    }
+
+    public void cacheEntryUpdated(CacheEntryEvent event) {
+    }
+
+    public void cacheGroupAdded(CacheGroupEvent event) {
+    }
+
+    public void cacheGroupEntryAdded(CacheGroupEvent event) {
+    }
+
+    public void cacheGroupEntryRemoved(CacheGroupEvent event) {
+    }
+
+    public void cacheGroupRemoved(CacheGroupEvent event) {
+    }
+
+    public void cacheGroupUpdated(CacheGroupEvent event) {
+    }
+
+    /**
+     * Called by the cache administrator class when a cache is instantiated.
+     *
+     * @param cache the cache instance that this listener is attached to.
+     * @param config The cache's configuration details. This allows the event handler
+     * to initialize itself based on the cache settings, and also to receive <em>additional</em>
+     * settings that were part of the cache configuration but that the cache
+     * itself does not care about. If you are using <code>cache.properties</code>
+     * for your configuration, simply add any additional properties that your event
+     * handler requires and they will be passed through in this parameter.
+     *
+     * @throws InitializationException thrown when there was a problem initializing the
+     * listener. The cache administrator will log this error and disable the listener.
+     */
+    public void initialize(Cache cache, Config config) throws InitializationException {
+        this.cache = cache;
+    }
+
+    /**
+     * Handles incoming notification messages. This method should be called by the
+     * underlying broadcasting implementation when a message is received from another
+     * node in the cluster.
+     *
+     * @param message The incoming cluster notification message object.
+     */
+    public void handleClusterNotification(ClusterNotification message) {
+        if (cache == null) {
+            log.warn("A cluster notification (" + message + ") was received, but no cache is registered on this machine. Notification ignored.");
+
+            return;
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Cluster notification (" + message + ") was received.");
+        }
+
+        switch (message.getType()) {
+            case ClusterNotification.FLUSH_KEY:
+                cache.flushEntry((String) message.getData(), CLUSTER_ORIGIN);
+                break;
+            case ClusterNotification.FLUSH_GROUP:
+                cache.flushGroup((String) message.getData(), CLUSTER_ORIGIN);
+                break;
+            case ClusterNotification.FLUSH_PATTERN:
+                cache.flushPattern((String) message.getData(), CLUSTER_ORIGIN);
+                break;
+            case ClusterNotification.FLUSH_CACHE:
+                cache.flushAll((Date) message.getData(), CLUSTER_ORIGIN);
+                break;
+            default:
+                log.error("The cluster notification (" + message + ") is of an unknown type. Notification ignored.");
+        }
+    }
+
+    /**
+     * Called when a cluster notification message is to be broadcast. Implementing
+     * classes should use their underlying transport to broadcast the message across
+     * the cluster.
+     *
+     * @param message The notification message to broadcast.
+     */
+    abstract protected void sendNotification(ClusterNotification message);
+}

src/plugins/clustersupport/java/com/opensymphony/oscache/plugins/clustersupport/BroadcastingCacheEventListener.java

-/*
- * Copyright (c) 2002-2003 by OpenSymphony
- * All rights reserved.
- */
-package com.opensymphony.oscache.plugins.clustersupport;
-
-import com.opensymphony.oscache.base.*;
-import com.opensymphony.oscache.base.events.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Implementation of a CacheEntryEventListener. It broadcasts the flush events
- * to other listening caches. This allows caches to be clustered.
- *
- * @version        $Revision$
- * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
- */
-public class BroadcastingCacheEventListener implements CacheEntryEventListener, LifecycleAware {
-    private final static Log log = LogFactory.getLog(BroadcastingCacheEventListener.class);
-    static ClusterManager cm;
-
-    /**
-     * Reference count to keep track of how many instances of this listener
-     * are in existence. Once this listener is no longer used we can shut
-     * down the cluster manager.
-     */
-    static int referenceCount = 0;
-
-    public BroadcastingCacheEventListener() {
-        log.info("BroadcastingCacheEventListener registered");
-    }
-
-    // --------------------------------------------------------
-    // The remaining events are of no interest to this listener
-    // --------------------------------------------------------
-    public void cacheEntryAdded(CacheEntryEvent event) {
-    }
-
-    /**
-     * Event fired when an entry is flushed from the cache. This broadcasts
-     * the flush message to any listening nodes on the network.
-     */
-    public void cacheEntryFlushed(CacheEntryEvent event) {
-        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !ClusterManager.CLUSTER_ORIGIN.equals(event.getOrigin())) {
-            if (log.isDebugEnabled()) {
-                log.debug("cacheEntryFlushed called (" + event + ")");
-            }
-
-            cm.signalEntryFlush(event.getKey());
-        }
-    }
-
-    public void cacheEntryRemoved(CacheEntryEvent event) {
-    }
-
-    public void cacheEntryUpdated(CacheEntryEvent event) {
-    }
-
-    public void cacheGroupAdded(CacheGroupEvent event) {
-    }
-
-    public void cacheGroupEntryAdded(CacheGroupEvent event) {
-    }
-
-    public void cacheGroupEntryRemoved(CacheGroupEvent event) {
-    }
-
-    /**
-     * Event fired when an entry is removed from the cache. This broadcasts
-     * the remove method to any listening nodes on the network, as long as
-     * this event wasn't from a broadcast in the first place.
-     */
-    public void cacheGroupFlushed(CacheGroupEvent event) {
-        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !ClusterManager.CLUSTER_ORIGIN.equals(event.getOrigin())) {
-            if (log.isDebugEnabled()) {
-                log.debug("cacheGroupFushed called (" + event + ")");
-            }
-
-            cm.signalGroupFlush(event.getGroup());
-        }
-    }
-
-    public void cacheGroupRemoved(CacheGroupEvent event) {
-    }
-
-    public void cacheGroupUpdated(CacheGroupEvent event) {
-    }
-
-    public void cachePatternFlushed(CachePatternEvent event) {
-        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !ClusterManager.CLUSTER_ORIGIN.equals(event.getOrigin())) {
-            if (log.isDebugEnabled()) {
-                log.debug("cachePatternFushed called (" + event + ")");
-            }
-
-            cm.signalPatternFlush(event.getPattern());
-        }
-    }
-
-    public void cacheFlushed(CachewideEvent event) {
-        if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !ClusterManager.CLUSTER_ORIGIN.equals(event.getOrigin())) {
-            if (log.isDebugEnabled()) {
-                log.debug("cacheFushed called (" + event + ")");
-            }
-
-            cm.signalCacheFlush(event.getDate());
-        }
-    }
-
-    /**
-     * Shuts down the {@link ClusterManager} instance being managed by this listener
-     * once this listener is no longer in use.
-     *
-     * @throws FinalizationException
-     */
-    public synchronized void finialize() throws FinalizationException {
-        referenceCount--;
-
-        if (referenceCount == 0) {
-            cm.shutdown();
-        }
-    }
-
-    /**
-     * Initializes the broadcasting listener by creating a {@link ClusterManager}
-     * instance to handle incoming and outgoing messages. If this listener is
-     * used multiple times, it is reference-counted so we know when to shutdown
-     * the <code>ClusterManagaer</code> again.
-     *
-     * @param config An OSCache configuration object.
-     * @throws InitializationException If this listener has already been initialized.
-     */
-    public synchronized void initialize(Cache cache, Config config) throws InitializationException {
-        if (referenceCount == 0) {
-            cm = new ClusterManager(config);
-            cm.setCache(cache);
-        }
-
-        referenceCount++;
-    }
-}

src/plugins/clustersupport/java/com/opensymphony/oscache/plugins/clustersupport/ClusterManager.java

-/*
- * Copyright (c) 2002-2003 by OpenSymphony
- * All rights reserved.
- */
-package com.opensymphony.oscache.plugins.clustersupport;
-
-import com.opensymphony.oscache.base.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.javagroups.Address;
-import org.javagroups.Channel;
-
-import org.javagroups.blocks.NotificationBus;
-
-import java.io.Serializable;
-
-import java.util.Date;
-
-/**
- * Handles the sending of notification messages across the cluster. This
- * implementation is based on the JavaGroups library.
- *
- * @version        $Revision$
- * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
- */
-public final class ClusterManager implements NotificationBus.Consumer {
-    private static final String BUS_NAME = "OSCacheBus";
-    private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
-    private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
-
-    /**
-     * The name to use for the origin of cluster events. Using this ensures
-     * events are not fired recursively back over the cluster.
-     */
-    static final String CLUSTER_ORIGIN = "CLUSTER";
-
-    /**
-     * The first half of the default channel properties. They default channel properties are:
-     * <pre>
-     * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.STABLE(desired_avg_gossip=20000):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
-     * </pre>
-     * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
-     */
-    private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr=";
-
-    /**
-     * The second half of the default channel properties. They default channel properties are:
-     * <pre>
-     * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.STABLE(desired_avg_gossip=20000):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
-     * </pre>
-     * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
-     */
-    private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.STABLE(desired_avg_gossip=20000):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
-    private static final String DEFAULT_MULTICAST_IP = "231.12.21.132";
-    private final Log log = LogFactory.getLog(ClusterManager.class);
-    private Cache cache;
-    private NotificationBus bus;
-    private boolean shuttingDown = false;
-
-    /**
-     * Creates a <code>ClusterManager</code> instance using the supplied
-     * configuration.
-     */
-    ClusterManager(Config config) throws InitializationException {
-        String properties = config.getProperty(CHANNEL_PROPERTIES);
-        String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
-
-        if ((properties == null) && (multicastIP == null)) {
-            multicastIP = DEFAULT_MULTICAST_IP;
-        }
-
-        if (properties == null) {
-            properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST;
-        } else {
-            properties = properties.trim();
-        }
-
-        if (log.isInfoEnabled()) {
-            log.info("Starting a new ClusterManager with properties=" + properties);
-        }
-
-        try {
-            bus = new NotificationBus(BUS_NAME, properties);
-            bus.start();
-            bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
-            bus.setConsumer(this);
-            log.info("ClusterManager started successfully");
-        } catch (Exception e) {
-            throw new InitializationException("Initialization failed: " + e);
-        }
-    }
-
-    /**
-     * We are not using the caching, so we just return something that identifies
-     * us. This method should never be called directly.
-     */
-    public Serializable getCache() {
-        return "ClusterManager: " + bus.getLocalAddress();
-    }
-
-    /**
-     * Handles incoming notification messages. This method should never be called
-     * directly.
-     *
-     * @param serializable The incoming message object.
-     */
-    public void handleNotification(Serializable serializable) {
-        if (!(serializable instanceof ClusterNotification)) {
-            log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored.");
-
-            return;
-        }
-
-        ClusterNotification msg = (ClusterNotification) serializable;
-
-        if (cache == null) {
-            log.warn("A cluster notification (" + msg + ") was received, but no matching cache is registered on this machine. Notification ignored.");
-
-            return;
-        }
-
-        if (log.isInfoEnabled()) {
-            log.info("Cluster notification (" + msg + ") was received.");
-        }
-
-        switch (msg.getType()) {
-            case ClusterNotification.FLUSH_KEY:
-                cache.flushEntry((String) msg.getData(), CLUSTER_ORIGIN);
-                break;
-            case ClusterNotification.FLUSH_GROUP:
-                cache.flushGroup((String) msg.getData(), CLUSTER_ORIGIN);
-                break;
-            case ClusterNotification.FLUSH_PATTERN:
-                cache.flushPattern((String) msg.getData(), CLUSTER_ORIGIN);
-                break;
-            case ClusterNotification.FLUSH_CACHE:
-                cache.flushAll((Date) msg.getData(), CLUSTER_ORIGIN);
-            default:
-                log.error("The cluster notification (" + msg + ") is of an unknown type. Notification ignored.");
-        }
-    }
-
-    /**
-     * A callback that is fired when a new member joins the cluster. This
-     * method should never be called directly.
-     *
-     * @param address The address of the member who just joined.
-     */
-    public void memberJoined(Address address) {
-        log.info("A new member at address '" + address + "' has joined the cluster");
-    }
-
-    /**
-     * A callback that is fired when an existing member leaves the cluster.
-     * This method should never be called directly.
-     *
-     * @param address The address of the member who left.
-     */
-    public void memberLeft(Address address) {
-        log.info("Member at address '" + address + "' left the cluster");
-    }
-
-    /**
-     * Shuts down this <code>ClusterManager</code> instance. Care should be
-     * taken to ensure that no more notification messages will be sent using
-     * this instance once this method is called. The manager however is still
-     * able to receive notification events from other nodes in the cluster
-     * without any risk of problems.
-     */
-    void shutdown() {
-        if (!shuttingDown) {
-            shuttingDown = true;
-            log.info("ClusterManager shutting down...");
-            bus.stop();
-            bus = null;
-            log.info("ClusterManager shutdown complete.");
-        }
-    }
-
-    /**
-     * Broadcasts a flush message for the given cache entry.
-     * place.
-     *
-     * @param key The object key to broadcast a flush notification message for
-     */
-    void signalEntryFlush(String key) {
-        if (log.isDebugEnabled()) {
-            log.debug("flushEntry called for cache key '" + key + "'");
-        }
-
-        if (!shuttingDown) {
-            bus.sendNotification(new ClusterNotification(ClusterNotification.FLUSH_KEY, key));
-        }
-    }
-
-    /**
-     * Broadcasts a flush message for the given cache group.
-     *
-     * @param group The group to broadcast a flush message for
-     */
-    void signalGroupFlush(String group) {
-        if (log.isDebugEnabled()) {
-            log.debug("flushGroup called for cache group '" + group + "'");
-        }
-
-        if (!shuttingDown) {
-            bus.sendNotification(new ClusterNotification(ClusterNotification.FLUSH_GROUP, group));
-        }
-    }
-
-    /**
-     * Broadcasts a flush message for the given cache pattern.
-     *
-     * @param pattern The pattern to broadcast a flush message for
-     */
-    void signalPatternFlush(String pattern) {
-        if (log.isDebugEnabled()) {
-            log.debug("flushPattern called for cache pattern '" + pattern + "'");
-        }
-
-        if (!shuttingDown) {
-            bus.sendNotification(new ClusterNotification(ClusterNotification.FLUSH_PATTERN, pattern));
-        }
-    }
-
-    /**
-     * Broadcasts a flush message for an entire cache
-     *
-     * @param date the date and time the cache was flushed
-     */
-    void signalCacheFlush(Date date) {
-        if (log.isDebugEnabled()) {
-            log.debug("flushCache called");
-        }
-
-        if (!shuttingDown) {
-            bus.sendNotification(new ClusterNotification(ClusterNotification.FLUSH_CACHE, date));
-        }
-    }
-
-    /**
-     * Hold a copy of the cache so we can apply asynchronous messages to it
-     * as they arrive.
-     */
-    public void setCache(Cache cache) {
-        this.cache = cache;
-    }
-}

src/plugins/clustersupport/java/com/opensymphony/oscache/plugins/clustersupport/JMS10BroadcastingListener.java

+/*
+ * Copyright (c) 2002-2003 by OpenSymphony
+ * All rights reserved.
+ */
+package com.opensymphony.oscache.plugins.clustersupport;
+
+import com.opensymphony.oscache.base.Cache;
+import com.opensymphony.oscache.base.Config;
+import com.opensymphony.oscache.base.FinalizationException;
+import com.opensymphony.oscache.base.InitializationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+
+import javax.naming.InitialContext;
+
+/**
+ * A JMS 1.0.x based clustering implementation. This implementation is independent of
+ * the JMS provider and uses non-persistent messages on a publish subscribe protocol.
+ *
+ * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
+ */
+public class JMS10BroadcastingListener extends AbstractBroadcastingListener {
+    private final static Log log = LogFactory.getLog(JMS10BroadcastingListener.class);
+
+    /**
+     * The name of this cluster. Used to identify the sender of a message.
+     */
+    private String clusterNode;
+
+    /**
+     *The JMS connection used
+     */
+    private TopicConnection connection;
+
+    /**
+     * Th object used to publish new messages
+     */
+    private TopicPublisher publisher;
+
+    /**
+     * The current JMS session
+     */
+    private TopicSession publisherSession;
+
+    /**
+     * <p>Called by the cache administrator class when a cache is instantiated.</p>
+     * <p>The JMS broadcasting implementation requires the following configuration
+     * properties to be specified in <code>oscache.properties</code>:
+     * <ul>
+     * <li><b>cache.cluster.topic.factory</b> - The JMS connection factory to use</li>
+     * <li><b>cache.cluster.topic.name</b> - The JMS topic name</li>
+     * <li><b>cache.cluster.name</b> - The name of this node in the cluster. This should
+     * be unique for each node.</li>
+     * Please refer to the clustering documentation for further details on configuring
+     * the JMS clustered caching.</p>
+     *
+     * @param cache the cache instance that this listener is attached to.
+     *
+     * @throws InitializationException thrown when there was a
+     * problem initializing the listener. The cache administrator will log this error and
+     * disable the listener.
+     */
+    public void initialize(Cache cache, Config config) throws InitializationException {
+        super.initialize(cache, config);
+
+        // Get the name of this node
+        clusterNode = config.getProperty("cache.cluster.jms.node.name");
+
+        String topic = config.getProperty("cache.cluster.jms.topic.name");
+        String topicFactory = config.getProperty("cache.cluster.jms.topic.factory");
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting JMS clustering (node name=" + clusterNode + ", topic=" + topic + ", topic factory=" + topicFactory + ")");
+        }
+
+        try {
+            // Make sure you have specified the necessary JNDI properties (usually in
+            // a jndi.properties resource file, or as system properties)
+            InitialContext jndi = new InitialContext();
+
+            // Look up a JMS connection factory
+            TopicConnectionFactory connectionFactory = (TopicConnectionFactory) jndi.lookup(topicFactory);
+
+            // Create a JMS connection
+            connection = connectionFactory.createTopicConnection();
+
+            // Create session objects
+            publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Look up the JMS topic
+            Topic chatTopic = (Topic) jndi.lookup(topic);
+
+            // Create the publisher and subscriber
+            publisher = publisherSession.createPublisher(chatTopic);
+
+            TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
+
+            // Set the message listener
+            subscriber.setMessageListener(new MessageListener() {
+                    public void onMessage(Message message) {
+                        try {
+                            //check the message type
+                            ObjectMessage objectMessage = null;
+
+                            if (!(message instanceof ObjectMessage)) {
+                                log.error("Cannot handle message of type (class=" + message.getClass().getName() + "). Notification ignored.");
+                                return;
+                            }
+
+                            objectMessage = (ObjectMessage) message;
+
+                            //check the message content
+                            if (!(objectMessage.getObject() instanceof ClusterNotification)) {
+                                log.error("An unknown cluster notification message received (class=" + objectMessage.getObject().getClass().getName() + "). Notification ignored.");
+                                return;
+                            }
+
+                            if (log.isDebugEnabled()) {
+                                log.debug(objectMessage.getObject());
+                            }
+
+                            // This prevents the notification sent by this node from being handled by itself
+                            if (!objectMessage.getStringProperty("node.name").equals(clusterNode)) {
+                                //now handle the message
+                                ClusterNotification notification = (ClusterNotification) objectMessage.getObject();
+                                handleClusterNotification(notification);
+                            }
+                        } catch (JMSException jmsEx) {
+                            log.error("Cannot handle cluster Notification", jmsEx);
+                        }
+                    }
+                });
+
+            // Start the JMS connection; allows messages to be delivered
+            connection.start();
+        } catch (Exception e) {
+            throw new InitializationException("Initialization of the JMS10BroadcastingListener failed: " + e);
+        }
+    }
+
+    /**
+     * Called by the cache administrator class when a cache is destroyed.
+     *
+     * @throws FinalizationException thrown when there was a problem finalizing the
+     * listener. The cache administrator will catch and log this error.
+     */
+    public void finialize() throws FinalizationException {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("Shutting down JMS clustering...");
+            }
+
+            connection.close();
+
+            if (log.isInfoEnabled()) {
+                log.info("JMS clustering shutdown complete.");
+            }
+        } catch (JMSException e) {
+            log.warn("A problem was encountered when closing the JMS connection", e);
+        }
+    }
+
+    protected void sendNotification(ClusterNotification message) {
+        try {
+            ObjectMessage objectMessage = publisherSession.createObjectMessage();
+            objectMessage.setObject(message);
+
+            //sign the message, with the name of this node
+            objectMessage.setStringProperty("node.name", clusterNode);
+            publisher.publish(objectMessage);
+        } catch (JMSException e) {
+            log.error("Cannot send notification " + message, e);
+        }
+    }
+}

src/plugins/clustersupport/java/com/opensymphony/oscache/plugins/clustersupport/JMSBroadcastingListener.java

+/*
+ * Copyright (c) 2002-2003 by OpenSymphony
+ * All rights reserved.
+ */
+package com.opensymphony.oscache.plugins.clustersupport;
+
+import com.opensymphony.oscache.base.Cache;
+import com.opensymphony.oscache.base.Config;
+import com.opensymphony.oscache.base.FinalizationException;
+import com.opensymphony.oscache.base.InitializationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+
+import javax.naming.InitialContext;
+
+/**
+ * A JMS based clustering implementation. This implementation is independent of the
+ * JMS provider and uses non-persistent messages on a publish subscribe protocol.
+ *
+ * @author <a href="mailto:motoras@linuxmail.org">Romulus Pasca</a>
+ */
+public class JMSBroadcastingListener extends AbstractBroadcastingListener {
+    private final static Log log = LogFactory.getLog(JMSBroadcastingListener.class);
+
+    /**
+     *The JMS connection used
+     */
+    private Connection connection;
+
+    /**
+     * Th object used to publish new messages
+     */
+    private MessageProducer messagePublisher;
+
+    /**
+     * The current JMS session
+     */
+    private Session publisherSession;
+
+    /**
+     * The name of this cluster. Used to identify the sender of a message.
+     */
+    private String clusterNode;
+
+    /**
+     * <p>Called by the cache administrator class when a cache is instantiated.</p>
+     * <p>The JMS broadcasting implementation requires the following configuration
+     * properties to be specified in <code>oscache.properties</code>:
+     * <ul>
+     * <li><b>cache.cluster.topic.factory</b> - The JMS connection factory to use</li>
+     * <li><b>cache.cluster.topic.name</b> - The JMS topic name</li>
+     * <li><b>cache.cluster.name</b> - The name of this node in the cluster. This should
+     * be unique for each node.</li>
+     * Please refer to the clustering documentation for further details on configuring
+     * the JMS clustered caching.</p>
+     *
+     * @param cache the cache instance that this listener is attached to.
+     *
+     * @throws com.opensymphony.oscache.base.InitializationException thrown when there was a
+     * problem initializing the listener. The cache administrator will log this error and
+     * disable the listener.
+     */
+    public void initialize(Cache cache, Config config) throws InitializationException {
+        super.initialize(cache, config);
+
+        // Get the name of this node
+        clusterNode = config.getProperty("cache.cluster.jms.node.name");
+
+        String topic = config.getProperty("cache.cluster.jms.topic.name");
+        String topicFactory = config.getProperty("cache.cluster.jms.topic.factory");
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting JMS clustering (node name=" + clusterNode + ", topic=" + topic + ", topic factory=" + topicFactory + ")");
+        }
+
+        try {
+            // Make sure you have specified the necessary JNDI properties (usually in
+            // a jndi.properties resource file, or as system properties)
+            InitialContext jndi = new InitialContext();
+
+            // Look up a JMS connection factory
+            ConnectionFactory connectionFactory = (ConnectionFactory) jndi.lookup(topicFactory);
+
+            // Create a JMS connection
+            connection = connectionFactory.createConnection();
+
+            // Create session objects
+            publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Session subSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Look up the JMS topic
+            Topic chatTopic = (Topic) jndi.lookup(topic);
+
+            // Create the publisher and subscriber
+            messagePublisher = publisherSession.createProducer(chatTopic);
+
+            MessageConsumer messageConsumer = subSession.createConsumer(chatTopic);
+
+            // Set the message listener
+            messageConsumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message message) {
+                        try {
+                            //check the message type
+                            ObjectMessage objectMessage = null;
+
+                            if (!(message instanceof ObjectMessage)) {
+                                log.error("Cannot handle message of type (class=" + message.getClass().getName() + "). Notification ignored.");
+                                return;
+                            }
+
+                            objectMessage = (ObjectMessage) message;
+
+                            //check the message content
+                            if (!(objectMessage.getObject() instanceof ClusterNotification)) {
+                                log.error("An unknown cluster notification message received (class=" + objectMessage.getObject().getClass().getName() + "). Notification ignored.");
+                                return;
+                            }
+
+                            if (log.isDebugEnabled()) {
+                                log.debug(objectMessage.getObject());
+                            }
+
+                            // This prevents the notification sent by this node from being handled by itself
+                            if (!objectMessage.getStringProperty("node.name").equals(clusterNode)) {
+                                //now handle the message
+                                ClusterNotification notification = (ClusterNotification) objectMessage.getObject();
+                                handleClusterNotification(notification);
+                            }
+                        } catch (JMSException jmsEx) {
+                            log.error("Cannot handle cluster Notification", jmsEx);
+                        }
+                    }
+                });
+
+            // Start the JMS connection; allows messages to be delivered
+            connection.start();
+        } catch (Exception e) {
+            throw new InitializationException("Initialization of the JMSBroadcastingListener failed: " + e);
+        }
+    }
+
+    /**
+     * Called by the cache administrator class when a cache is destroyed.
+     *
+     * @throws com.opensymphony.oscache.base.FinalizationException thrown when there was a problem finalizing the
+     * listener. The cache administrator will catch and log this error.
+     */
+    public void finialize() throws FinalizationException {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("Shutting down JMS clustering...");
+            }
+
+            connection.close();
+
+            if (log.isInfoEnabled()) {
+                log.info("JMS clustering shutdown complete.");
+            }
+        } catch (JMSException e) {
+            log.warn("A problem was encountered when closing the JMS connection", e);
+        }
+    }
+
+    protected void sendNotification(ClusterNotification message) {
+        try {
+            ObjectMessage objectMessage = publisherSession.createObjectMessage();
+            objectMessage.setObject(message);
+
+            //sign the message, with the name of this node
+            objectMessage.setStringProperty("node.name", clusterNode);
+            messagePublisher.send(objectMessage);
+        } catch (JMSException e) {
+            log.error("Cannot send notification " + message, e);
+        }
+    }
+}

src/plugins/clustersupport/java/com/opensymphony/oscache/plugins/clustersupport/JavaGroupsBroadcastingListener.java

+/*
+ * Copyright (c) 2002-2003 by OpenSymphony
+ * All rights reserved.
+ */
+package com.opensymphony.oscache.plugins.clustersupport;
+
+import com.opensymphony.oscache.base.Cache;
+import com.opensymphony.oscache.base.Config;
+import com.opensymphony.oscache.base.FinalizationException;
+import com.opensymphony.oscache.base.InitializationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.javagroups.Address;
+import org.javagroups.Channel;
+
+import org.javagroups.blocks.NotificationBus;
+
+import java.io.Serializable;
+
+/**
+ * <p>A concrete implementation of the {@link AbstractBroadcastingListener} based on
+ * the JavaGroups library. This Class uses JavaGroups to broadcast cache flush
+ * messages across a cluster.</p>
+ *
+ * <p>One of the following properties should be configured in <code>oscache.properties</code> for
+ * this listener:
+ * <ul>
+ * <li><b>cache.cluster.multicast.ip</b> - The multicast IP that JavaGroups should use for broadcasting</li>
+ * <li><b>cache.cluster.properties</b> - The JavaGroups channel properties to use. Allows for precise
+ * control over the behaviour of JavaGroups</li>
+ * </ul>
+ * Please refer to the clustering documentation for further details on the configuration of this listener.</p>
+ *
+ * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
+ */
+public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer {
+    private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
+    private static final String BUS_NAME = "OSCacheBus";
+    private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
+    private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
+
+    /**
+    * The first half of the default channel properties. They default channel properties are:
+    * <pre>
+    * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):pbcast.STABLE(desired_avg_gossip=20000):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
+    * </pre>
+    *
+    *
+    * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
+    */
+    private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr=";
+
+    /**
+    * The second half of the default channel properties. They default channel properties are:
+    * <pre>
+    * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):pbcast.STABLE(desired_avg_gossip=20000):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
+    * </pre>
+    * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
+    */
+    private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):pbcast.STABLE(desired_avg_gossip=20000):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
+    private static final String DEFAULT_MULTICAST_IP = "231.12.21.132";
+    private NotificationBus bus;
+
+    /**
+    * Initializes the broadcasting listener by starting up a JavaGroups notification
+    * bus instance to handle incoming and outgoing messages.
+    *
+    * @param config An OSCache configuration object.
+    * @throws com.opensymphony.oscache.base.InitializationException If this listener has
+    * already been initialized.
+    */
+    public synchronized void initialize(Cache cache, Config config) throws InitializationException {
+        super.initialize(cache, config);
+
+        String properties = config.getProperty(CHANNEL_PROPERTIES);
+        String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
+
+        if ((properties == null) && (multicastIP == null)) {
+            multicastIP = DEFAULT_MULTICAST_IP;
+        }
+
+        if (properties == null) {
+            properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST;
+        } else {
+            properties = properties.trim();
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties);
+        }
+
+        try {
+            bus = new NotificationBus(BUS_NAME, properties);
+            bus.start();
+            bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
+            bus.setConsumer(this);
+            log.info("JavaGroups clustering support started successfully");
+        } catch (Exception e) {
+            throw new InitializationException("Initialization failed: " + e);
+        }
+    }
+
+    /**
+    * Shuts down the JavaGroups being managed by this listener. This
+    * occurs once the cache is shut down and this listener is no longer
+    * in use.
+    *
+    * @throws com.opensymphony.oscache.base.FinalizationException
+    */
+    public synchronized void finialize() throws FinalizationException {
+        if (log.isInfoEnabled()) {
+            log.info("JavaGroups shutting down...");
+        }
+
+        bus.stop();
+        bus = null;
+
+        if (log.isInfoEnabled()) {
+            log.info("JavaGroups shutdown complete.");
+        }
+    }
+
+    /**
+    * Uses JavaGroups to broadcast the supplied notification message across the cluster.
+    *
+    * @param message The cluster nofication message to broadcast.
+    */
+    protected void sendNotification(ClusterNotification message) {
+        bus.sendNotification(message);
+    }
+
+    /**
+    * Handles incoming notification messages from JavaGroups. This method should
+    * never be called directly.
+    *
+    * @param serializable The incoming message object. This must be a {@link ClusterNotification}.
+    */
+    public void handleNotification(Serializable serializable) {
+        if (!(serializable instanceof ClusterNotification)) {
+            log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored.");
+
+            return;
+        }
+
+        handleClusterNotification((ClusterNotification) serializable);
+    }
+
+    /**
+    * We are not using the caching, so we just return something that identifies
+    * us. This method should never be called directly.
+    */
+    public Serializable getCache() {
+        return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
+    }
+
+    /**
+    * A callback that is fired when a new member joins the cluster. This
+    * method should never be called directly.
+    *
+    * @param address The address of the member who just joined.
+    */
+    public void memberJoined(Address address) {
+        if (log.isInfoEnabled()) {
+            log.info("A new member at address '" + address + "' has joined the cluster");
+        }
+    }
+
+    /**
+    * A callback that is fired when an existing member leaves the cluster.
+    * This method should never be called directly.
+    *
+    * @param address The address of the member who left.
+    */
+    public void memberLeft(Address address) {
+        if (log.isInfoEnabled()) {
+            log.info("Member at address '" + address + "' left the cluster");
+        }
+    }
+}

src/plugins/clustersupport/test/com/opensymphony/oscache/plugins/clustersupport/BaseTestBroadcastingListener.java

+/*
+ * Copyright (c) 2002-2003 by OpenSymphony
+ * All rights reserved.
+ */
+package com.opensymphony.oscache.plugins.clustersupport;
+
+import com.opensymphony.oscache.base.*;
+import com.opensymphony.oscache.base.events.CacheEntryEventListener;
+
+import junit.framework.TestCase;
+
+import java.util.Date;
+
+/**
+ * A base class that provides the framework for testing a cluster listener
+ * implementation.
+ *
+ * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
+ */
+public abstract class BaseTestBroadcastingListener extends TestCase {
+    /**
+     * The persistance listener used for the tests
+     */
+    protected static AbstractBroadcastingListener listener = null;
+
+    /**
+     * A cache instance to use for the tests
+     */
+    protected static Cache cache = null;
+
+    /**
+     * The number of tests in this class. This is used to keep
+     * track of how many tests remain; once we reach zero we shut
+     * down the broadcasting listener.
+     */
+    int testsRemaining = 0;
+
+    /**
+     * Cache group
+     */
+    private final String GROUP = "test group";
+
+    /**
+     * Object key
+     */
+    private final String KEY = "Test clustersupport persistence listener key";
+
+    public BaseTestBroadcastingListener(String str) {
+        super(str);
+    }
+
+    /**
+     * Tests the listener by causing the cache to fire off all its
+     * events
+     */
+    public void testListener() {
+        CacheEntry entry = new CacheEntry(KEY, null);
+
+        cache.putInCache(KEY, entry);
+        cache.putInCache(KEY, entry, new String[] {GROUP});
+        cache.flushEntry(KEY);
+        cache.flushGroup(GROUP);
+        cache.flushAll(new Date());
+
+        // Note that the remove event is not called since it's not exposed.
+    }
+
+    /**
+     * This method is invoked before each testXXXX methods of the
+     * class. It set up the broadcasting listener required for each test.
+     */
+    public void setUp() {
+        // At first invocation, create a listener
+        if (listener == null) {
+            testsRemaining = countTestCases(); // This seems to always return 1 even if there are multiple tests?
+
+            listener = getListener();
+            assertNotNull(listener);
+
+            cache = new Cache(true, false);
+            assertNotNull(cache);
+
+            try {
+                listener.initialize(cache, getConfig());
+            } catch (InitializationException e) {
+                fail(e.getMessage());
+            }
+
+            cache.addCacheEventListener(listener, CacheEntryEventListener.class);
+        }
+    }
+
+    /**
+     * Once all the tests are complete this will shut down the broadcasting listener.
+     */
+    protected void tearDown() throws Exception {
+        if (--testsRemaining == 0) {
+            try {
+                listener.finialize();
+                listener = null;
+            } catch (FinalizationException e) {
+                fail(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Child classes implement this to return the broadcasting listener instance
+     * that will be tested.
+     */
+    abstract AbstractBroadcastingListener getListener();
+
+    /**
+     * Child classes implement this to return the configuration for their listener
+     * @return
+     */
+    abstract Config getConfig();
+}

src/plugins/clustersupport/test/com/opensymphony/oscache/plugins/clustersupport/ListenForClusterTests.java

+/*
+ * Copyright (c) 2002-2003 by OpenSymphony
+ * All rights reserved.
+ */
+package com.opensymphony.oscache.plugins.clustersupport;
+
+import com.opensymphony.oscache.base.Cache;
+import com.opensymphony.oscache.base.Config;
+import com.opensymphony.oscache.base.FinalizationException;
+import com.opensymphony.oscache.base.InitializationException;
+import com.opensymphony.oscache.base.events.CacheEntryEventListener;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * <p>This should be used in conjunction with the cluster test cases. Run this
+ * program to set up listeners for the various clustering implementations so
+ * you can see that the test messages are being received correctly.</p>
+ *
+ * <p>A shutdown hook is installed so the listeners can be shut down cleanly.</p>
+ *
+ * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
+ */
+public final class ListenForClusterTests {
+    ArrayList listeners = new ArrayList();
+    Cache cache;
+
+    private void mainLoop() {
+        Thread shutdownHook = new ShutdownHookThread("");
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+        System.out.println();
+        System.out.println("------------------------------------------------");
+        System.out.println("Waiting for cluster messages... (CTRL-C to exit)");
+        System.out.println("------------------------------------------------");
+
+        while (true) {
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException ie) {
+            }
+        }
+    }
+
+    private void initListeners() {
+        BaseTestBroadcastingListener testcase = null;
+        AbstractBroadcastingListener listener;
+        Cache cache = new Cache(true, false);
+
+        // Add the JavaGroups listener
+        try {
+            testcase = new TestJavaGroupsBroadcastingListener("JavaGroups");
+            listener = testcase.getListener();
+            listener.initialize(cache, testcase.getConfig());
+            cache.addCacheEventListener(listener, CacheEntryEventListener.class);
+            listeners.add(listener);
+        } catch (InitializationException e) {
+            System.out.println("The JavaGroups listener could not be initialized: " + e);
+        }
+
+        // Add the JMS listener
+        try {
+            testcase = new TestJMSBroadcastingListener("JMS");
+            listener = testcase.getListener();
+
+            Config config = testcase.getConfig();
+            config.set("cache.cluster.jms.node.name", "cacheNode2");
+
+            listener.initialize(cache, config);
+            cache.addCacheEventListener(listener, CacheEntryEventListener.class);
+            listeners.add(listener);
+        } catch (InitializationException e) {
+            System.out.println("The JMS listener could not be initialized: " + e);
+        }
+    }
+
+    /**
+     * Starts up the cluster listeners.
+     */
+    public static void main(String[] args) {
+        ListenForClusterTests listen = new ListenForClusterTests();
+
+        listen.initListeners();
+
+        listen.mainLoop();
+    }
+
+    /**
+     * Inner class that handles the shutdown event
+     */
+    class ShutdownHookThread extends Thread {
+        protected String message;
+
+        public ShutdownHookThread(String message) {
+            this.message = message;
+        }
+
+        /**
+         * This is executed when the application is forcibly shutdown (via
+         * CTRL-C etc). Any configured listeners are shut down here.
+         */
+        public void run() {
+            System.out.println("Shutting down the cluster listeners...");
+
+            for (Iterator it = listeners.iterator(); it.hasNext();) {
+                try {
+                    ((AbstractBroadcastingListener) it.next()).finialize();
+                } catch (FinalizationException e) {
+                    System.out.println("The listener could not be shut down cleanly: " + e);
+                }
+            }
+
+            System.out.println("Shutdown complete.");
+        }
+    }
+}

src/plugins/clustersupport/test/com/opensymphony/oscache/plugins/clustersupport/TestBroadcastingCacheEventListener.java

-/*
- * Copyright (c) 2002-2003 by OpenSymphony
- * All rights reserved.
- */
-package com.opensymphony.oscache.plugins.clustersupport;
-
-import com.opensymphony.oscache.base.Cache;
-import com.opensymphony.oscache.base.CacheEntry;
-import com.opensymphony.oscache.base.Config;
-import com.opensymphony.oscache.base.InitializationException;
-import com.opensymphony.oscache.base.events.CacheEntryEvent;
-import com.opensymphony.oscache.base.events.CacheGroupEvent;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Test all the public methods of the broadcasting listener and assert the
- * return values
- *
- * @version        $Revision$
- * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
- */
-public final class TestBroadcastingCacheEventListener extends TestCase {
-    /**
-     * The persistance listener used for the tests
-     */
-    private static BroadcastingCacheEventListener listener = null;
-
-    /**
-     * A cache instance to use for the tests
-     */
-    private static Cache cache = null;
-
-    /**
-     * Cache group
-     */
-    private final String GROUP = "test group";
-
-    /**
-     * Object key
-     */
-    private final String KEY = "Test clustersupport persistence listener key";
-
-    public TestBroadcastingCacheEventListener(String str) {
-        super(str);
-    }
-
-    /**
-     * This methods returns the name of this test class to JUnit
-     * <p>
-     * @return The test for this class
-     */
-    public static Test suite() {
-        return new TestSuite(TestBroadcastingCacheEventListener.class);
-    }
-
-    /**
-     * This method is invoked before each testXXXX methods of the
-     * class. It set ups the variables required for each tests.
-     */
-    public void setUp() {
-        // At first invocation, create a listener
-        if (listener == null) {
-            listener = new BroadcastingCacheEventListener();
-
-            Config config = new Config();
-            config.set("cache.cluster.multicast.ip", "231.12.21.132");
-
-            try {
-                listener.initialize(null, config);
-            } catch (InitializationException e) {
-                fail(e.getMessage());
-            }
-
-            cache = new Cache(true, false);
-            assertNotNull(listener);
-            assertNotNull(cache);
-        }
-    }
-
-    public void testCacheEntryAdded() {
-        CacheEntry entry = new CacheEntry(KEY, null);
-        CacheEntryEvent event = new CacheEntryEvent(cache, entry);
-        listener.cacheEntryAdded(event);