baseten / Sources / BXPGClearLocksHandler.m

// BXPGClearLocksHandler.m
// BaseTen
// Copyright (C) 2006-2010 Marko Karppinen & Co. LLC.
// Before using this software, please review the available licensing options
// by visiting or by contacting
// us at Without an additional license, this software
// may be distributed only in compliance with the GNU General Public License.
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License, version 2.0,
// as published by the Free Software Foundation.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
// $Id$

#import "BXPGClearLocksHandler.h"
#import "BXDatabaseObjectIDPrivate.h"
#import "BXLogger.h"
#import "PGTSAdditions.h"
#import "PGTSHOM.h"
#import "PGTSOids.h"

static void
bx_error_during_clear_notification (id self, NSError* error)
	BXLogWarning (@"During clear notification: %@", error);

@implementation BXPGClearLocksHandler
+ (NSString *) notificationName
	return @"baseten_unlocked_locks";

- (void) handleNotification: (PGTSNotification *) notification
	PGTSResultSet* xactRes = nil;
	NSError* error = nil;

	xactRes = [mConnection executeQuery: @"BEGIN"];
	if (! [xactRes querySucceeded]) 
		error = [xactRes error];
		goto error;
	NSArray* relids = [mInterface observedRelids];
    //Which tables have pending locks?
    NSString* query = 
	@"SELECT l.last_date, l.lock_table_name, r.relname, r.nspname "
	@" FROM baseten.pending_locks l "
	@" INNER JOIN baseten.relation r ON ( = l.relid) "
	@" WHERE l.last_date > COALESCE ($1, '-infinity')::timestamp "
	@"  AND l.relid = ANY ($2) ";
    PGTSResultSet* res = [mConnection executeQuery: query parameters: mLastCheck, relids];
    if (NO == [res querySucceeded])
		error = [res error];
		goto error;
	//Update the timestamp.
	while ([res advanceRow]) 
		[self setLastCheck: [res valueForKey: @"last_date"]];	
	//Hopefully not too many tables, because we need to get unlocked rows for each of them.
	//We can't union the queries, either, because the primary key fields differ.
	NSMutableArray* ids = [NSMutableArray array];
	BXDatabaseContext* ctx = [mInterface databaseContext];
	while ([res advanceRow])
		[ids removeAllObjects];
		NSString *query = nil;
		NSString *relname = [res valueForKey: @"relname"];
		NSString *nspname = [res valueForKey: @"nspname"];
		PGTSTableDescription *table = [[mConnection databaseDescription] table: relname inSchema: nspname];
			NSString* queryFormat =
			@"SELECT DISTINCT ON (%@) l.* "
			@"WHERE baseten_lock_cleared = true "
			@" AND baseten_lock_backend_pid != pg_backend_pid () "
			@" AND baseten_lock_timestamp > COALESCE ($1, '-infinity')::timestamp ";
			//Primary key field names.
			NSArray* pkeyfnames = (id) [[[[table primaryKey] columns] PGTSCollect] quotedName: mConnection];
			NSString* pkeystr = [pkeyfnames componentsJoinedByString: @", "];
			//Table names.
			NSString* lockTableName = [res valueForKey: @"lock_table_name"];
			NSString* tableName = [table schemaQualifiedName: mConnection];
			query = [NSString stringWithFormat: queryFormat, pkeystr, lockTableName, tableName];
			PGTSResultSet* unlockedRows = [mConnection executeQuery: query parameters: mLastCheck];
			if (! [unlockedRows querySucceeded])
				error = [unlockedRows error];
				goto error;

			//Get the entity.
			NSString* tableName = [table name];
			NSString* schemaName = [table schemaName];
			BXEntityDescription* entity = [[ctx databaseObjectModel] entityForTable: tableName inSchema: schemaName];
			if (! entity) goto error;
			while ([unlockedRows advanceRow])
				NSDictionary* row = [unlockedRows currentRowAsDictionary];
				BXDatabaseObjectID* anID = [BXDatabaseObjectID IDWithEntity: entity primaryKeyFields: row];
				[ids addObject: anID];
		//Only one entity allowed per array.
		[[mInterface databaseContext] unlockedObjectsInDatabase: ids];
	xactRes = [mConnection executeQuery: @"COMMIT"];
	if (! [xactRes querySucceeded])
		error = [xactRes error];
		goto error;
	[mConnection executeQuery: @"ROLLBACK"];
	bx_error_during_clear_notification (self, error);
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.