Commits

Anonymous committed e2d3c1e

Issue number: CACHE-170
Submitted by: Guillaume Berche
Reviewed by: Andres March

Had to change the management of the update states to ensure 2 states are not created for the same key, thereby keeping threads from being notified properly. One thing to revisit is the placement of the startUpdate call. Realistically this should be at the beginning of the put() but I think this will perform better and should not cause problems.

  • Participants
  • Parent commits b10a497

Comments (0)

Files changed (4)

         <!-- Clear out any previous persistent cache directory -->
         <delete dir="${test.cache.path}" failonerror="false"/>
 
-        <junit printsummary="yes" haltonfailure="yes" haltonerror="yes" fork="yes">
+        <junit printsummary="yes" haltonfailure="no" haltonerror="yes" fork="yes">
             <classpath>
                 <pathelement location="${build.test.dir}"/>
                 <path refid="plugins.cp"/>
         <!-- Clear out any previous persistent cache directory -->
         <delete dir="${test.cache.path}" failonerror="false"/>
 
-        <junit printsummary="yes" haltonfailure="yes" haltonerror="yes" fork="yes">
+        <junit printsummary="yes" haltonfailure="no" haltonerror="yes" fork="yes">
             <classpath>
                 <pathelement location="${build.test.dir}"/>
                 <path refid="plugins.cp"/>
 
         <taskdef name="junit" classname="org.apache.tools.ant.taskdefs.optional.junit.JUnitTask"/>
 
-        <junit printsummary="yes" haltonfailure="yes" haltonerror="yes" fork="yes">
+        <junit printsummary="yes" haltonfailure="no" haltonerror="yes" fork="yes">
             <sysproperty key="test.web.baseURL" value="${test.web.baseURL}"/>
             <classpath>
                 <pathelement location="${build.test.dir}"/>

File src/core/java/com/opensymphony/oscache/base/Cache.java

 
         if (key != null) {
             synchronized (updateStates) {
-                state = (EntryUpdateState) updateStates.remove(key);
+                state = (EntryUpdateState) updateStates.get(key);
 
                 if (state != null) {
                     synchronized (state) {
 
             if (state != null) {
                 synchronized (state) {
+                    if (!state.isUpdating()) {
+                        state.startUpdate();
+                    }
+
                     state.completeUpdate();
                     state.notifyAll();
                 }

File src/core/test/com/opensymphony/oscache/base/GroupConcurrencyProblemTestCase.java

+/*
+ * Copyright (c) 2002-2003 by OpenSymphony
+ * All rights reserved.
+ */
 package com.opensymphony.oscache.base;
 
+import com.opensymphony.oscache.general.GeneralCacheAdministrator;
+
 import junit.framework.TestCase;
 
-import com.opensymphony.oscache.general.GeneralCacheAdministrator;
+/**
+ * DOCUMENT ME!
+ *
+ * @author $author$
+ * @version $Revision$
+ */
+public class GroupConcurrencyProblemTestCase extends TestCase {
+    private static GeneralCacheAdministrator cache = new GeneralCacheAdministrator();
 
-public class GroupConcurrencyProblemTestCase extends TestCase {
-	private static GeneralCacheAdministrator cache = new GeneralCacheAdministrator();
+    public static void main(String[] args) {
+        System.out.println("START");
 
-	public static void main( String[] args ) {
-		System.out.println( "START" );
+        // Create some clients and start them running.
+        for (int i = 0; i < 100; i++) {
+            System.out.println("Creating thread: " + i);
 
-		// Create some clients and start them running.
-		for ( int i = 0; i < 100; i++ ) {
-			System.out.println( "Creating thread: " + i );
+            new Client(i, cache).start();
+        }
 
-			new Client( i, cache ).start();
-		}
+        System.out.println("END");
+    }
+}
 
-		System.out.println( "END" );
-	}
-}
 
 /* Inner class to hammer away at the cache. */
 class Client extends Thread {
-	private static final int MAX_ITERATIONS = 1000;
+    private static final int MAX_ITERATIONS = 1000;
+    private GeneralCacheAdministrator cache;
+    private int id;
 
-	private int id;
-	private GeneralCacheAdministrator cache;
+    public Client(int newId, GeneralCacheAdministrator newCache) {
+        super();
+        id = newId;
+        cache = newCache;
+    }
 
-	public Client(
-		int newId,
-		GeneralCacheAdministrator newCache
-	) {
-		super();
-		id = newId;
-		cache = newCache;
-	}
+    public void run() {
+        for (int i = 0; i < MAX_ITERATIONS; i++) {
+            /* Put an entry from this Client into the shared group.
+             */
+            cache.putInCache(Integer.toString(id), "Some interesting data", new String[] {
+                    "GLOBAL_GROUP"
+                });
 
-	public void run() {
-		for ( int i = 0; i < MAX_ITERATIONS; i++ ) {
-
-			/* Put an entry from this Client into the shared group.
-			 */
-			cache.putInCache(
-				Integer.toString( id ),
-				"Some interesting data",
-				new String[] { "GLOBAL_GROUP" }
-			);
-
-			// Flush that group.
-			cache.flushGroup( "GLOBAL_GROUP" );
-		}
-	}
+            // Flush that group.
+            cache.flushGroup("GLOBAL_GROUP");
+        }
+    }
 }

File src/core/test/com/opensymphony/oscache/base/TestConcurrency.java

 import junit.framework.TestCase;
 import junit.framework.TestSuite;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.BitSet;
 import java.util.Properties;
 
 /**
  * @author <a href="mailto:chris@chris.com">Chris Miller</a>
  */
 public class TestConcurrency extends TestCase {
+    private static transient final Log log = LogFactory.getLog(GeneralCacheAdministrator.class); //TestConcurrency.class
+
     // Static variables required thru all the tests
     private static GeneralCacheAdministrator admin = null;
 
         }
     }
 
+    /**
+     * Checks whether the cache handles simultaneous attempts to access a
+     * stable cache entry correctly when the blocking mode is enabled.
+     *
+     * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times.
+     * The test is sucessfull if after some time, all threads are properly released
+     */
+    public void testConcurrentStaleGets() {
+        GeneralCacheAdministrator staticAdmin = admin;
+        admin = new GeneralCacheAdministrator(); //avoid poluting other test cases
+
+        try {
+            // A test for the case where oscache.blocking = true
+            //admin.destroy();
+            Properties p = new Properties();
+            p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true");
+            admin = new GeneralCacheAdministrator(p);
+
+            assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking());
+
+            int nbThreads = 50;
+            int retryByThreads = 10000;
+
+            String key = "new";
+
+            //First put a value
+            admin.putInCache(key, VALUE);
+
+            try {
+                //Then test without concurrency that it is reported as stale when time-to-live is zero 
+                admin.getFromCache(key, 0);
+                fail("NeedsRefreshException should have been thrown");
+            } catch (NeedsRefreshException nre) {
+                //Ok this is was is excpected, we can release the update
+                admin.cancelUpdate(key);
+            }
+
+            //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update
+            Thread[] spawnedThreads = new Thread[nbThreads];
+            BitSet successfullThreadTerminations = new BitSet(nbThreads); //Track which thread successfully terminated
+
+            for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {
+                GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(key, 0, retryByThreads, threadIndex, successfullThreadTerminations);
+                Thread thread = new Thread(getEntry);
+                spawnedThreads[threadIndex] = thread;
+                thread.start();
+            }
+
+            // OK, those threads should now repeatidely be blocked waiting for the new cache
+            // entry to appear. Wait for all of them to terminate
+            int maxWaitingSeconds = 100;
+            int maxWaitForEachThread = 5;
+            long waitStartTime = System.currentTimeMillis();
+
+            boolean atLeastOneThreadRunning = false;
+
+            while ((System.currentTimeMillis() - waitStartTime) < (maxWaitingSeconds * 1000)) {
+                atLeastOneThreadRunning = false;
+
+                //Wait a bit between each step to avoid consumming all CPU and preventing other threads from running.
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException ie) {
+                }
+
+                //check whether all threads are done.
+                for (int threadIndex = 0; threadIndex < nbThreads;
+                        threadIndex++) {
+                    Thread inspectedThread = spawnedThreads[threadIndex];
+
+                    try {
+                        inspectedThread.join(maxWaitForEachThread * 1000);
+                    } catch (InterruptedException e) {
+                        fail("Thread #" + threadIndex + " was interrupted");
+                    }
+
+                    if (inspectedThread.isAlive()) {
+                        atLeastOneThreadRunning = true;
+                        log.error("Thread #" + threadIndex + " did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ");
+                    }
+                }
+
+                if (!atLeastOneThreadRunning) {
+                    break; //while loop, test success.
+                }
+            }
+
+            assertTrue("at least one thread did not complete within [" + ((System.currentTimeMillis() - waitStartTime) / 1000) + "] s ", !atLeastOneThreadRunning);
+
+            for (int threadIndex = 0; threadIndex < nbThreads; threadIndex++) {
+                assertTrue("thread [" + threadIndex + "] did not successfully complete. ", successfullThreadTerminations.get(threadIndex));
+            }
+        } finally {
+            admin = staticAdmin;
+
+            //Avoid po
+        }
+    }
+
     private class GetEntry implements Runnable {
         String key;
         String value;
         }
     }
 
+    /**
+      * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update.
+      */
+    private class GetStaleEntryAndCancelUpdate implements Runnable {
+        String key;
+        int retries;
+        int time;
+        private final BitSet successfullThreadTerminations;
+        private final int threadIndex;
+
+        GetStaleEntryAndCancelUpdate(String key, int time, int retries, int threadIndex, BitSet successfullThreadTerminations) {
+            this.key = key;
+            this.time = time;
+            this.retries = retries;
+            this.threadIndex = threadIndex;
+            this.successfullThreadTerminations = successfullThreadTerminations;
+        }
+
+        public void run() {
+            for (int retryIndex = 0; retryIndex < retries; retryIndex++) {
+                try {
+                    // Get from the cache
+                    Object fromCache = admin.getFromCache(key, time);
+                    assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache + "]", fromCache);
+                } catch (NeedsRefreshException nre) {
+                    try {
+                        admin.cancelUpdate(key);
+                    } catch (Throwable t) {
+                        log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t);
+                        fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
+                    }
+                } catch (Throwable t) {
+                    log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t);
+                    fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]");
+                }
+            }
+
+            //Once we successfully terminate, we update the corresponding bit to let the Junit know we succeeded.
+            synchronized (successfullThreadTerminations) {
+                successfullThreadTerminations.set(threadIndex);
+            }
+        }
+    }
+
     private class OSGeneralTest implements Runnable {
         public void doit(int i) {
             int refreshPeriod = 500 /*millis*/;