Commits

jrosenberg  committed 09a11b7

Issue number: QUARTZ-497
Submitted by: Skip Walker
Created db locking semaphore that updates a row instead of selecting for update. This works for MSSQL.
Refactored most of StdRowLockSemaphore into a common base class, DBSemaphore to be shared with the new UpdateLockRowSemaphore class.

git-svn-id: http://svn.opensymphony.com/svn/quartz/trunk@60969f7d36a-ea1c-0410-88ea-9fd03e4c9665

  • Participants
  • Parent commits 70efb04

Comments (0)

Files changed (3)

File src/java/org/quartz/impl/jdbcjobstore/DBSemaphore.java

+/* 
+ * Copyright 2004-2006 OpenSymphony 
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not 
+ * use this file except in compliance with the License. You may obtain a copy 
+ * of the License at 
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0 
+ *   
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
+ * License for the specific language governing permissions and limitations 
+ * under the License.
+ * 
+ */
+package org.quartz.impl.jdbcjobstore;
+
+import java.sql.Connection;
+import java.util.HashSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Base class for database based lock handlers for providing thread/resource locking 
+ * in order to protect resources from being altered by multiple threads at the 
+ * same time.
+ */
+public abstract class DBSemaphore implements Semaphore, Constants,
+    StdJDBCConstants, TablePrefixAware {
+
+    private final Log log = LogFactory.getLog(getClass());
+
+    /*
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     * 
+     * Data members.
+     * 
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     */
+
+    ThreadLocal lockOwners = new ThreadLocal();
+
+    private String sql;
+
+    private String tablePrefix;
+    
+    private String expandedSQL;
+
+    /*
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     * 
+     * Constructors.
+     * 
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     */
+
+    public DBSemaphore(String tablePrefix, String sql, String defaultSQL) {
+        this.sql = defaultSQL;
+        this.tablePrefix = tablePrefix;
+        setSQL(sql);
+    }
+
+    /*
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     * 
+     * Interface.
+     * 
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     */
+
+    protected Log getLog() {
+        return log;
+    }
+
+    private HashSet getThreadLocks() {
+        HashSet threadLocks = (HashSet) lockOwners.get();
+        if (threadLocks == null) {
+            threadLocks = new HashSet();
+            lockOwners.set(threadLocks);
+        }
+        return threadLocks;
+    }
+
+    /**
+     * Execute the SQL that will lock the proper database row.
+     */
+    protected abstract void executeSQL(
+        Connection conn, String lockName, String expandedSQL) throws LockException;
+    
+    /**
+     * Grants a lock on the identified resource to the calling thread (blocking
+     * until it is available).
+     * 
+     * @return true if the lock was obtained.
+     */
+    public boolean obtainLock(Connection conn, String lockName)
+        throws LockException {
+
+        lockName = lockName.intern();
+
+        Log log = getLog();
+        
+        if(log.isDebugEnabled()) {
+            log.debug(
+                "Lock '" + lockName + "' is desired by: "
+                        + Thread.currentThread().getName());
+        }
+        if (!isLockOwner(conn, lockName)) {
+
+            executeSQL(conn, lockName, expandedSQL);
+            
+            if(log.isDebugEnabled()) {
+                log.debug(
+                    "Lock '" + lockName + "' given to: "
+                            + Thread.currentThread().getName());
+            }
+            getThreadLocks().add(lockName);
+            //getThreadLocksObtainer().put(lockName, new
+            // Exception("Obtainer..."));
+        } else if(log.isDebugEnabled()) {
+            log.debug(
+                "Lock '" + lockName + "' Is already owned by: "
+                        + Thread.currentThread().getName());
+        }
+
+        return true;
+    }
+
+       
+    /**
+     * Release the lock on the identified resource if it is held by the calling
+     * thread.
+     */
+    public void releaseLock(Connection conn, String lockName) {
+
+        lockName = lockName.intern();
+
+        if (isLockOwner(conn, lockName)) {
+            if(getLog().isDebugEnabled()) {
+                getLog().debug(
+                    "Lock '" + lockName + "' returned by: "
+                            + Thread.currentThread().getName());
+            }
+            getThreadLocks().remove(lockName);
+            //getThreadLocksObtainer().remove(lockName);
+        } else if (getLog().isDebugEnabled()) {
+            getLog().warn(
+                "Lock '" + lockName + "' attempt to return by: "
+                        + Thread.currentThread().getName()
+                        + " -- but not owner!",
+                new Exception("stack-trace of wrongful returner"));
+        }
+    }
+
+    /**
+     * Determine whether the calling thread owns a lock on the identified
+     * resource.
+     */
+    public boolean isLockOwner(Connection conn, String lockName) {
+        lockName = lockName.intern();
+
+        return getThreadLocks().contains(lockName);
+    }
+
+    /**
+     * This Semaphore implementation does use the database.
+     */
+    public boolean requiresConnection() {
+        return true;
+    }
+
+    protected String getSQL() {
+        return sql;
+    }
+
+    protected void setSQL(String sql) {
+        if ((sql != null) && (sql.trim().length() != 0)) {
+            this.sql = sql;
+        }
+        
+        setExpandedSQL();
+    }
+
+    private void setExpandedSQL() {
+        if (getTablePrefix() != null) {
+            expandedSQL = Util.rtp(this.sql, getTablePrefix());
+        }
+    }
+    
+    protected String getTablePrefix() {
+        return tablePrefix;
+    }
+
+    public void setTablePrefix(String tablePrefix) {
+        this.tablePrefix = tablePrefix;
+        
+        setExpandedSQL();
+    }
+}

File src/java/org/quartz/impl/jdbcjobstore/StdRowLockSemaphore.java

 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * Internal database based lock handler for providing thread/resource locking 
  * 
  * @author jhouse
  */
-public class StdRowLockSemaphore implements Semaphore, Constants,
-        StdJDBCConstants {
-
-    private final Log log = LogFactory.getLog(getClass());
+public class StdRowLockSemaphore extends DBSemaphore {
 
     /*
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     /*
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      * 
-     * Data members.
-     * 
-     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-     */
-
-    ThreadLocal lockOwners = new ThreadLocal();
-
-    //  java.util.HashMap threadLocksOb = new java.util.HashMap();
-    private String selectWithLockSQL = SELECT_FOR_LOCK;
-
-    private String tablePrefix = DEFAULT_TABLE_PREFIX;
-    
-    private String expandedSelectWithLockSQL;
-
-    /*
-     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-     * 
      * Constructors.
      * 
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      * a bean.
      */
     public StdRowLockSemaphore() {
-        setExpandedSelectWithLockSQL();
+        super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK);
     }
     
     public StdRowLockSemaphore(String tablePrefix, String selectWithLockSQL) {
-        this.tablePrefix = tablePrefix;
-        setSelectWithLockSQL(selectWithLockSQL);
+        super(tablePrefix, selectWithLockSQL, SELECT_FOR_LOCK);
     }
 
     /*
      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      */
 
-    protected Log getLog() {
-        return log;
-    }
-
-    private HashSet getThreadLocks() {
-        HashSet threadLocks = (HashSet) lockOwners.get();
-        if (threadLocks == null) {
-            threadLocks = new HashSet();
-            lockOwners.set(threadLocks);
-        }
-        return threadLocks;
-    }
-
     /**
-     * Grants a lock on the identified resource to the calling thread (blocking
-     * until it is available).
-     * 
-     * @return true if the lock was obtained.
+     * Execute the SQL select for update that will lock the proper database row.
      */
-    public boolean obtainLock(Connection conn, String lockName)
-        throws LockException {
-
-        lockName = lockName.intern();
-
-        Log log = getLog();
-        
-        if(log.isDebugEnabled()) {
-            log.debug(
-                "Lock '" + lockName + "' is desired by: "
-                        + Thread.currentThread().getName());
-        }
-        if (!isLockOwner(conn, lockName)) {
-
-            PreparedStatement ps = null;
-            ResultSet rs = null;
-            try {
-                ps = conn.prepareStatement(expandedSelectWithLockSQL);
-                ps.setString(1, lockName);
-
-                
-                if(log.isDebugEnabled()) {
-                    log.debug(
-                        "Lock '" + lockName + "' is being obtained: "
-                                + Thread.currentThread().getName());
-                }
-                rs = ps.executeQuery();
-                if (!rs.next()) {
-                    throw new SQLException(Util.rtp(
-                            "No row exists in table " + TABLE_PREFIX_SUBST
-                                    + TABLE_LOCKS + " for lock named: "
-                                    + lockName, tablePrefix));
-                }
-            } catch (SQLException sqle) {
-                //Exception src =
-                // (Exception)getThreadLocksObtainer().get(lockName);
-                //if(src != null)
-                //  src.printStackTrace();
-                //else
-                //  System.err.println("--- ***************** NO OBTAINER!");
-
-                if(log.isDebugEnabled()) {
-                    log.debug(
-                        "Lock '" + lockName + "' was not obtained by: "
-                                + Thread.currentThread().getName());
-                }
-                throw new LockException("Failure obtaining db row lock: "
-                        + sqle.getMessage(), sqle);
-            } finally {
-                if (rs != null) { 
-                    try {
-                        rs.close();
-                    } catch (Exception ignore) {
-                    }
-                }
-                if (ps != null) {
-                    try {
-                        ps.close();
-                    } catch (Exception ignore) {
-                    }
-                }
-            }
+    protected void executeSQL(Connection conn, String lockName, String expandedSQL) throws LockException {
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+        try {
+            ps = conn.prepareStatement(expandedSQL);
+            ps.setString(1, lockName);
             
-            if(log.isDebugEnabled()) {
-                log.debug(
-                    "Lock '" + lockName + "' given to: "
-                            + Thread.currentThread().getName());
+            if (getLog().isDebugEnabled()) {
+                getLog().debug(
+                    "Lock '" + lockName + "' is being obtained: " + 
+                    Thread.currentThread().getName());
             }
-            getThreadLocks().add(lockName);
-            //getThreadLocksObtainer().put(lockName, new
-            // Exception("Obtainer..."));
-        } else if(log.isDebugEnabled()) {
-            log.debug(
-                "Lock '" + lockName + "' Is already owned by: "
-                        + Thread.currentThread().getName());
-        }
-
-        return true;
-    }
-
-    /**
-     * Release the lock on the identified resource if it is held by the calling
-     * thread.
-     */
-    public void releaseLock(Connection conn, String lockName) {
-
-        lockName = lockName.intern();
-
-        if (isLockOwner(conn, lockName)) {
-            if(getLog().isDebugEnabled()) {
+            rs = ps.executeQuery();
+            if (!rs.next()) {
+                throw new SQLException(Util.rtp(
+                    "No row exists in table " + TABLE_PREFIX_SUBST + 
+                    TABLE_LOCKS + " for lock named: " + lockName, getTablePrefix()));
+            }
+        } catch (SQLException sqle) {
+            //Exception src =
+            // (Exception)getThreadLocksObtainer().get(lockName);
+            //if(src != null)
+            //  src.printStackTrace();
+            //else
+            //  System.err.println("--- ***************** NO OBTAINER!");
+
+            if (getLog().isDebugEnabled()) {
                 getLog().debug(
-                    "Lock '" + lockName + "' returned by: "
-                            + Thread.currentThread().getName());
+                    "Lock '" + lockName + "' was not obtained by: " + 
+                    Thread.currentThread().getName());
+            }
+            
+            throw new LockException("Failure obtaining db row lock: "
+                    + sqle.getMessage(), sqle);
+        } finally {
+            if (rs != null) { 
+                try {
+                    rs.close();
+                } catch (Exception ignore) {
+                }
+            }
+            if (ps != null) {
+                try {
+                    ps.close();
+                } catch (Exception ignore) {
+                }
             }
-            getThreadLocks().remove(lockName);
-            //getThreadLocksObtainer().remove(lockName);
-        } else if (getLog().isDebugEnabled()) {
-            getLog().warn(
-                "Lock '" + lockName + "' attempt to retun by: "
-                        + Thread.currentThread().getName()
-                        + " -- but not owner!",
-                new Exception("stack-trace of wrongful returner"));
         }
     }
 
-    /**
-     * Determine whether the calling thread owns a lock on the identified
-     * resource.
-     */
-    public boolean isLockOwner(Connection conn, String lockName) {
-
-        lockName = lockName.intern();
-
-        return getThreadLocks().contains(lockName);
-    }
-
-    /**
-     * This Semaphore implementation does use the database.
-     */
-    public boolean requiresConnection() {
-        return true;
-    }
-
     protected String getSelectWithLockSQL() {
-        return selectWithLockSQL;
+        return getSQL();
     }
 
     public void setSelectWithLockSQL(String selectWithLockSQL) {
-        if ((selectWithLockSQL != null) && (selectWithLockSQL.trim().length() != 0)) {
-            this.selectWithLockSQL = selectWithLockSQL;
-        }
-        
-        setExpandedSelectWithLockSQL();
-    }
-
-    private void setExpandedSelectWithLockSQL() {
-        if (getTablePrefix() != null) {
-            expandedSelectWithLockSQL = Util.rtp(this.selectWithLockSQL, getTablePrefix());
-        }
-    }
-    
-    protected String getTablePrefix() {
-        return tablePrefix;
-    }
-
-    public void setTablePrefix(String tablePrefix) {
-        this.tablePrefix = tablePrefix;
-        
-        setExpandedSelectWithLockSQL();
+        setSQL(selectWithLockSQL);
     }
 }

File src/java/org/quartz/impl/jdbcjobstore/UpdateLockRowSemaphore.java

+/*
+ * Copyright 2004-2006 OpenSymphony
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.quartz.impl.jdbcjobstore;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Provide thread/resource locking in order to protect
+ * resources from being altered by multiple threads at the same time using
+ * a db row update.
+ * 
+ * <p>
+ * <b>Note:</b> This Semaphore implementation is useful for databases that do
+ * not support row locking via "SELECT FOR UPDATE" type syntax, for example
+ * Microsoft SQLServer (MSSQL).
+ * </p> 
+ */
+public class UpdateLockRowSemaphore extends DBSemaphore {
+
+    /*
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     *
+     * Constants.
+     *
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     */
+
+    public static final String UPDATE_FOR_LOCK = 
+        "UPDATE " + TABLE_PREFIX_SUBST + TABLE_LOCKS + 
+        " SET " + COL_LOCK_NAME + " = " + COL_LOCK_NAME +
+        " WHERE " + COL_LOCK_NAME + " = ? ";
+
+
+    /*
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     *
+     * Constructors.
+     *
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     */
+
+    public UpdateLockRowSemaphore() {
+        super(DEFAULT_TABLE_PREFIX, null, UPDATE_FOR_LOCK);
+    }
+
+    /*
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     *
+     * Interface.
+     *
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     */
+
+    /**
+     * Execute the SQL select for update that will lock the proper database row.
+     */
+    protected void executeSQL(Connection conn, String lockName, String expandedSQL) throws LockException {
+        PreparedStatement ps = null;
+
+        try {
+            ps = conn.prepareStatement(expandedSQL);
+            ps.setString(1, lockName);
+
+            if (getLog().isDebugEnabled()) {
+                getLog().debug(
+                    "Lock '" + lockName + "' is being obtained: " + 
+                    Thread.currentThread().getName());
+            }
+            
+            int numUpdate = ps.executeUpdate();
+            
+            if (numUpdate < 1) {
+                throw new SQLException(Util.rtp(
+                    "No row exists in table " + TABLE_PREFIX_SUBST + TABLE_LOCKS + 
+                    " for lock named: " + lockName, getTablePrefix()));
+            }
+        } catch (SQLException sqle) {
+            //Exception src =
+            // (Exception)getThreadLocksObtainer().get(lockName);
+            //if(src != null)
+            //  src.printStackTrace();
+            //else
+            //  System.err.println("--- ***************** NO OBTAINER!");
+
+            if(getLog().isDebugEnabled()) {
+                getLog().debug(
+                    "Lock '" + lockName + "' was not obtained by: " + 
+                    Thread.currentThread().getName());
+            }
+            
+            throw new LockException(
+                "Failure obtaining db row lock: " + sqle.getMessage(), sqle);
+        } finally {
+            if (ps != null) {
+                try {
+                    ps.close();
+                } catch (Exception ignore) {
+                }
+            }
+        }
+    }
+    
+    protected String getUpdateLockRowSQL() {
+        return getSQL();
+    }
+
+    public void setUpdateLockRowSQL(String updateLockRowSQL) {
+        setSQL(updateLockRowSQL);
+    }
+}