Commits

Lars Kanis committed bdad1d6

Refactor different variants of waiting for the connection socket.

Reduce the 4 variants of waiting to only 2 which are used by both

This also fixes 'Bad file descriptor' respectively wrong behaviour
of #wait_for_notify() on Windows and fixes timeout handling of

Make use of rb_thread_fd_select() on Ruby 1.9, to avoid deprecation
warnings on rb_thread_select().

  • Participants
  • Parent commits 4a6fe37

Comments (0)

Files changed (2)

File ext/extconf.rb

 have_func 'rb_enc_alias'
 have_func 'rb_thread_call_without_gvl'
 have_func 'rb_thread_call_with_gvl'
+have_func 'rb_thread_fd_select'
 
 have_const 'PGRES_COPY_BOTH', 'libpq-fe.h'
 have_const 'PGRES_SINGLE_TUPLE', 'libpq-fe.h'

File ext/pg_connection.c

 static VALUE pgconn_set_default_encoding( VALUE self );
 #endif
 
+#ifndef HAVE_RB_THREAD_FD_SELECT
+#define rb_fdset_t fd_set
+#define rb_fd_init(f)
+#define rb_fd_zero(f)  FD_ZERO(f)
+#define rb_fd_set(n, f)  FD_SET(n, f)
+#define rb_thread_fd_select rb_thread_select
+#endif
+
 /*
  * Global functions
  */
 	return hash;
 }
 
-
-#ifdef _WIN32
+/* Win32 + Ruby 1.8 */
+#if !defined( HAVE_RUBY_VM_H ) && defined( _WIN32 )
+
 /*
  * Duplicate the sockets from libpq and create temporary CRT FDs
  */
 }
 #endif
 
+/* Win32 + Ruby 1.9+ */
+#if defined( HAVE_RUBY_VM_H ) && defined( _WIN32 )
+/*
+ * On Windows, use platform-specific strategies to wait for the socket
+ * instead of rb_thread_select().
+ */
+
+int rb_w32_wait_events( HANDLE *events, int num, DWORD timeout );
+
+/* If WIN32 and Ruby 1.9 do not use rb_thread_select() which sometimes hangs
+ * and does not wait (nor sleep) any time even if timeout is given.
+ * Instead use the Winsock events and rb_w32_wait_events(). */
+
+static void *
+wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readable)(PGconn *) )
+{
+	int sd = PQsocket( conn );
+	void *retval;
+	DWORD timeout_milisec = INFINITE;
+	DWORD wait_ret;
+	WSAEVENT hEvent;
+
+	if ( sd < 0 )
+		rb_bug( "PQsocket(conn): couldn't fetch the connection's socket!" );
+
+	hEvent = WSACreateEvent();
+
+	if ( ptimeout ) {
+		timeout_milisec = (DWORD)( ptimeout->tv_sec * 1e3 + ptimeout->tv_usec / 1e3 );
+	}
+
+	/* Check for connection errors (PQisBusy is true on connection errors) */
+	if( PQconsumeInput(conn) == 0 ) {
+		WSACloseEvent( hEvent );
+		rb_raise( rb_ePGerror, PQerrorMessage(conn) );
+	}
+
+	while ( !(retval=is_readable(conn)) ) {
+		if ( WSAEventSelect(sd, hEvent, FD_READ|FD_CLOSE) == SOCKET_ERROR ) {
+			WSACloseEvent( hEvent );
+			rb_raise( rb_ePGerror, "WSAEventSelect socket error: %d", WSAGetLastError() );
+		}
+
+		wait_ret = rb_w32_wait_events( &hEvent, 1, timeout_milisec );
+
+		if ( wait_ret == WAIT_TIMEOUT ) {
+			WSACloseEvent( hEvent );
+			return NULL;
+		} else if ( wait_ret == WAIT_OBJECT_0 ) {
+			/* The event we were waiting for. */
+		} else if ( wait_ret == WAIT_FAILED ) {
+			WSACloseEvent( hEvent );
+			rb_raise( rb_ePGerror, "Wait on socket error (WaitForMultipleObjects): %d", GetLastError() );
+		} else {
+			WSACloseEvent( hEvent );
+			rb_raise( rb_ePGerror, "Wait on socket abandoned (WaitForMultipleObjects)" );
+		}
+
+		/* Check for connection errors (PQisBusy is true on connection errors) */
+		if ( PQconsumeInput(conn) == 0 ) {
+			WSACloseEvent( hEvent );
+			rb_raise( rb_ePGerror, PQerrorMessage(conn) );
+		}
+	}
+
+	WSACloseEvent( hEvent );
+	return retval;
+}
+
+#else
+
+/* non Win32 or Win32+Ruby-1.8 */
+
+static void *
+wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readable)(PGconn *))
+{
+	int sd = PQsocket( conn );
+	int ret;
+	void *retval;
+	rb_fdset_t sd_rset;
+#ifdef _WIN32
+	rb_fdset_t crt_sd_rset;
+#endif
+
+	if ( sd < 0 )
+		rb_bug( "PQsocket(conn): couldn't fetch the connection's socket!" );
+
+	/* Check for connection errors (PQisBusy is true on connection errors) */
+	if ( PQconsumeInput(conn) == 0 )
+		rb_raise( rb_ePGerror, "%s", PQerrorMessage(conn) );
+
+  rb_fd_init( &sd_rset );
+
+	while ( !(retval=is_readable(conn)) ) {
+		rb_fd_zero( &sd_rset );
+		rb_fd_set( sd, &sd_rset );
+
+#ifdef _WIN32
+		/* Ruby's FD_SET is modified on win32 to convert a file descriptor
+		 * to osfhandle, but we already get a osfhandle from PQsocket().
+		 * Therefore it's overwritten here. */
+		sd_rset.fd_array[0] = sd;
+		create_crt_fd(&sd_rset, &crt_sd_rset);
+#endif
+
+		/* Wait for the socket to become readable before checking again */
+		ret = rb_thread_fd_select( sd+1, &sd_rset, NULL, NULL, ptimeout );
+
+#ifdef _WIN32
+		cleanup_crt_fd(&sd_rset, &crt_sd_rset);
+#endif
+
+		if ( ret < 0 )
+			rb_sys_fail( "rb_thread_select()" );
+
+		/* Return false if the select() timed out */
+		if ( ret == 0 )
+			return NULL;
+
+		/* Check for connection errors (PQisBusy is true on connection errors) */
+		if ( PQconsumeInput(conn) == 0 )
+			rb_raise( rb_ePGerror, "%s", PQerrorMessage(conn) );
+	}
+
+	return retval;
+}
+
+
+#endif
+
+static void *
+notify_readable(PGconn *conn)
+{
+	return (void*)PQnotifies(conn);
+}
+
 /*
  * call-seq:
  *    conn.wait_for_notify( [ timeout ] ) -> String
 pgconn_wait_for_notify(int argc, VALUE *argv, VALUE self)
 {
 	PGconn *conn = pg_get_pgconn( self );
-	PGnotify *notification;
-	int sd = PQsocket( conn );
-	int ret;
+	PGnotify *pnotification;
 	struct timeval timeout;
 	struct timeval *ptimeout = NULL;
 	VALUE timeout_in = Qnil, relname = Qnil, be_pid = Qnil, extra = Qnil;
 	double timeout_sec;
-	fd_set sd_rset;
-#ifdef _WIN32
-	fd_set crt_sd_rset;
-#endif
-
-	if ( sd < 0 )
-		rb_bug( "PQsocket(conn): couldn't fetch the connection's socket!" );
 
 	rb_scan_args( argc, argv, "01", &timeout_in );
 
 		ptimeout = &timeout;
 	}
 
-	/* Check for notifications */
-	while ( (notification = PQnotifies(conn)) == NULL ) {
-		FD_ZERO( &sd_rset );
-		FD_SET( sd, &sd_rset );
-
-#ifdef _WIN32
-		create_crt_fd(&sd_rset, &crt_sd_rset);
-#endif
-
-		/* Wait for the socket to become readable before checking again */
-		ret = rb_thread_select( sd+1, &sd_rset, NULL, NULL, ptimeout );
-
-#ifdef _WIN32
-		cleanup_crt_fd(&sd_rset, &crt_sd_rset);
-#endif
-
-		if ( ret < 0 )
-			rb_sys_fail( 0 );
-
-		/* Return nil if the select timed out */
-		if ( ret == 0 ) return Qnil;
-
-		/* Read the socket */
-		if ( (ret = PQconsumeInput(conn)) != 1 )
-			rb_raise( rb_ePGerror, "PQconsumeInput == %d: %s", ret, PQerrorMessage(conn) );
-	}
-
-	relname = rb_tainted_str_new2( notification->relname );
+	pnotification = (PGnotify*) wait_socket_readable( conn, ptimeout, notify_readable);
+
+	/* Return nil if the select timed out */
+	if ( !pnotification ) return Qnil;
+
+	relname = rb_tainted_str_new2( pnotification->relname );
 #ifdef M17N_SUPPORTED
 	ENCODING_SET( relname, rb_enc_to_index(pg_conn_enc_get( conn )) );
 #endif
-	be_pid = INT2NUM( notification->be_pid );
+	be_pid = INT2NUM( pnotification->be_pid );
 #ifdef HAVE_ST_NOTIFY_EXTRA
-	if ( *notification->extra ) {
-		extra = rb_tainted_str_new2( notification->extra );
+	if ( *pnotification->extra ) {
+		extra = rb_tainted_str_new2( pnotification->extra );
 #ifdef M17N_SUPPORTED
 		ENCODING_SET( extra, rb_enc_to_index(pg_conn_enc_get( conn )) );
 #endif
 	}
 #endif
-	PQfreemem( notification );
+	PQfreemem( pnotification );
 
 	if ( rb_block_given_p() )
 		rb_yield_values( 3, relname, be_pid, extra );
 }
 
 
-#ifndef _WIN32
+static void *
+get_result_readable(PGconn *conn)
+{
+	return PQisBusy(conn) ? NULL : (void*)1;
+}
+
 
 /*
  * call-seq:
 static VALUE
 pgconn_block( int argc, VALUE *argv, VALUE self ) {
 	PGconn *conn = pg_get_pgconn( self );
-	int sd = PQsocket( conn );
-	int ret;
 
 	/* If WIN32 and Ruby 1.9 do not use rb_thread_select() which sometimes hangs
 	 * and does not wait (nor sleep) any time even if timeout is given.
 
 	struct timeval timeout;
 	struct timeval *ptimeout = NULL;
-	fd_set sd_rset;
 	VALUE timeout_in;
 	double timeout_sec;
+	void *ret;
 
 	if ( rb_scan_args(argc, argv, "01", &timeout_in) == 1 ) {
 		timeout_sec = NUM2DBL( timeout_in );
 		ptimeout = &timeout;
 	}
 
-	/* Check for connection errors (PQisBusy is true on connection errors) */
-	if ( PQconsumeInput(conn) == 0 )
-		rb_raise( rb_ePGerror, "%s", PQerrorMessage(conn) );
-	while ( PQisBusy(conn) ) {
-		FD_ZERO( &sd_rset );
-		FD_SET( sd, &sd_rset );
-
-		if ( (ret = rb_thread_select( sd+1, &sd_rset, NULL, NULL, ptimeout )) < 0 )
-			rb_sys_fail( "rb_thread_select()" ); /* Raises */
-
-		/* Return false if there was a timeout argument and the select() timed out */
-		if ( ret == 0 && argc )
-			return Qfalse;
-
-		/* Check for connection errors (PQisBusy is true on connection errors) */
-		if ( PQconsumeInput(conn) == 0 )
-			rb_raise( rb_ePGerror, "%s", PQerrorMessage(conn) );
-	}
+	ret = wait_socket_readable( conn, ptimeout, get_result_readable);
+
+	if( !ret )
+		return Qfalse;
 
 	return Qtrue;
 }
 
 
-#else /* _WIN32 */
-
-/*
- * Win32 PG::Connection#block -- on Windows, use platform-specific strategies to wait for the socket
- * instead of rb_thread_select().
- */
-
-/* Win32 + Ruby 1.9+ */
-#ifdef HAVE_RUBY_VM_H
-
-int rb_w32_wait_events( HANDLE *events, int num, DWORD timeout );
-
-/* If WIN32 and Ruby 1.9 do not use rb_thread_select() which sometimes hangs
- * and does not wait (nor sleep) any time even if timeout is given.
- * Instead use the Winsock events and rb_w32_wait_events(). */
-
-static VALUE
-pgconn_block( int argc, VALUE *argv, VALUE self ) {
-	PGconn *conn = pg_get_pgconn( self );
-	int sd = PQsocket( conn );
-	int ret;
-
-	DWORD timeout_milisec = INFINITY;
-	DWORD wait_ret;
-	WSAEVENT hEvent;
-	VALUE timeout_in;
-	double timeout_sec;
-
-	hEvent = WSACreateEvent();
-
-	if ( rb_scan_args(argc, argv, "01", &timeout_in) == 1 ) {
-		timeout_sec = NUM2DBL( timeout_in );
-		timeout_milisec = (DWORD)( (timeout_sec - (DWORD)timeout_sec) * 1e3 );
-	}
-
-	/* Check for connection errors (PQisBusy is true on connection errors) */
-	if( PQconsumeInput(conn) == 0 ) {
-		WSACloseEvent( hEvent );
-		rb_raise( rb_ePGerror, PQerrorMessage(conn) );
-	}
-
-	while ( PQisBusy(conn) ) {
-		if ( WSAEventSelect(sd, hEvent, FD_READ|FD_CLOSE) == SOCKET_ERROR ) {
-			WSACloseEvent( hEvent );
-			rb_raise( rb_ePGerror, "WSAEventSelect socket error: %d", WSAGetLastError() );
-		}
-
-		wait_ret = rb_w32_wait_events( &hEvent, 1, 100 );
-
-		if ( wait_ret == WAIT_TIMEOUT ) {
-			ret = 0;
-		} else if ( wait_ret == WAIT_OBJECT_0 ) {
-			ret = 1;
-		} else if ( wait_ret == WAIT_FAILED ) {
-			WSACloseEvent( hEvent );
-			rb_raise( rb_ePGerror, "Wait on socket error (WaitForMultipleObjects): %d", GetLastError() );
-		} else {
-			WSACloseEvent( hEvent );
-			rb_raise( rb_ePGerror, "Wait on socket abandoned (WaitForMultipleObjects)" );
-		}
-
-		/* Return false if there was a timeout argument and the select() timed out */
-		if ( ret == 0 && argc ) {
-			WSACloseEvent( hEvent );
-			return Qfalse;
-		}
-
-		/* Check for connection errors (PQisBusy is true on connection errors) */
-		if ( PQconsumeInput(conn) == 0 ) {
-			WSACloseEvent( hEvent );
-			rb_raise( rb_ePGerror, PQerrorMessage(conn) );
-		}
-	}
-
-	WSACloseEvent( hEvent );
-
-	return Qtrue;
-}
-
-#else /* Win32 + Ruby < 1.9 */
-
-static VALUE
-pgconn_block( int argc, VALUE *argv, VALUE self ) {
-	PGconn *conn = pg_get_pgconn( self );
-	int sd = PQsocket( conn );
-	int ret;
-
-	struct timeval timeout;
-	struct timeval *ptimeout = NULL;
-	fd_set sd_rset;
-	fd_set crt_sd_rset;
-	VALUE timeout_in;
-	double timeout_sec;
-
-	/* Always set a timeout, as rb_thread_select() sometimes
-	 * doesn't return when a second ruby thread is running although data
-	 * could be read. So we use timeout-based polling instead.
-	 */
-	timeout.tv_sec = 0;
-	 timeout.tv_usec = 10000; /* 10ms */
-	ptimeout = &timeout;
-
-	if ( rb_scan_args(argc, argv, "01", &timeout_in) == 1 ) {
-		timeout_sec = NUM2DBL( timeout_in );
-		timeout.tv_sec = (time_t)timeout_sec;
-		timeout.tv_usec = (suseconds_t)((timeout_sec - (long)timeout_sec) * 1e6);
-		ptimeout = &timeout;
-	}
-
-	/* Check for connection errors (PQisBusy is true on connection errors) */
-	if( PQconsumeInput(conn) == 0 )
-		rb_raise( rb_ePGerror, PQerrorMessage(conn) );
-
-	while ( PQisBusy(conn) ) {
-		FD_ZERO( &sd_rset );
-		FD_SET( sd, &sd_rset );
-
-		create_crt_fd( &sd_rset, &crt_sd_rset );
-		ret = rb_thread_select( sd+1, &sd_rset, NULL, NULL, ptimeout );
-		cleanup_crt_fd( &sd_rset, &crt_sd_rset );
-
-		/* Return false if there was a timeout argument and the select() timed out */
-		if ( ret == 0 && argc )
-			return Qfalse;
-
-		/* Check for connection errors (PQisBusy is true on connection errors) */
-		if ( PQconsumeInput(conn) == 0 )
-			rb_raise( rb_ePGerror, PQerrorMessage(conn) );
-	}
-
-	return Qtrue;
-}
-
-#endif /* Ruby 1.9 */
-#endif /* Win32 */
-
-
 /*
  * call-seq:
  *    conn.get_last_result( ) -> PG::Result