Commits

Lars Kanis committed b7ce484 Draft

Implement PostgreSQL-9.2 functions PG::Connection#set_row_processor, get_row_processor, skip_result

  • Participants
  • Parent commits a45710f
  • Branches row_processor

Comments (0)

Files changed (6)

File ext/extconf.rb

 have_func 'PQsetClientEncoding'
 have_func 'PQlibVersion'
 have_func 'PQping'
+have_func 'PQsetRowProcessor'
 
 have_func 'rb_encdb_alias'
 have_func 'rb_enc_alias'
 PGconn *pg_get_pgconn	                               _(( VALUE ));
 
 VALUE pg_new_result                                    _(( PGresult *, VALUE ));
+VALUE pg_new_result_for_callback                       _(( PGresult *, VALUE ));
+VALUE pg_new_result2                                   _(( PGresult *, VALUE, void (*)( PGresult * )));
+
 VALUE pg_result_check                                  _(( VALUE ));
 VALUE pg_result_clear                                  _(( VALUE ));
 

File ext/pg_connection.c

 
 #endif /* M17N_SUPPORTED */
 
-
+#ifdef HAVE_PQSETROWPROCESSOR
+
+static int
+row_processor_proxy(PGresult *res, const PGdataValue *columns,
+		const char **errmsgp, void *param)
+{
+	VALUE proc;
+	VALUE self = (VALUE)param;
+
+	if ((proc = rb_iv_get(self, "@row_processor")) != Qnil) {
+		VALUE col_ary;
+		VALUE proc_result;
+		
+		if ( columns ) {
+			int num_fields = PQnfields(res);
+			VALUE new_row[ num_fields ];
+			int field;
+
+			/* populate the row */
+			for ( field = 0; field < num_fields; field++ ) {
+				if ( columns[ field ].len < 0 ) {
+					new_row[ field ] = Qnil;
+				}
+				else {
+					VALUE val = rb_tainted_str_new( columns[ field ].value,
+								columns[ field ].len );
+
+#ifdef M17N_SUPPORTED
+					/* associate client encoding for text format only */
+					if ( 0 == PQfformat(res, field) ) {
+						ASSOCIATE_INDEX( val, self );
+					} else {
+						rb_enc_associate( val, rb_ascii8bit_encoding() );
+					}
+#endif
+					new_row[ field ] = val;
+				}
+			}
+			col_ary = rb_ary_new4( num_fields, new_row );
+		} else {
+			col_ary = Qnil;
+		}
+
+		proc_result = rb_funcall(proc, rb_intern("call"), 2,
+			pg_new_result_for_callback(res, self),
+			col_ary
+		);
+
+		if ( NIL_P( proc_result )) {
+			return 1;
+		} else {
+			StringValue(proc_result);
+			*errmsgp = RSTRING_PTR( proc_result );
+			return -1;
+		}
+	} else {
+		*errmsgp = "Internal error: row_processor_proxy called but no row processor registered";
+		return -1;
+	}
+}
+
+/*
+ * call-seq:
+ *   conn.set_row_processor {|result, row| ... } -> nil
+ *
+ * Sets a block to process each row.
+ * 
+ * Ordinarily, when receiving a query result from the server, pg adds each
+ * row value to the current PG::Result until the entire result set is received;
+ * then the PG::Result is returned to the application as a unit. This approach
+ * is simple to work with, but becomes inefficient for large result sets.
+ * To improve performance, an application can register a custom row processor
+ * function that processes each row as the data is received from the network.
+ *
+ * When using a custom row processor, row data is not accumulated into the
+ * PG::Result, so the PG::Result ultimately delivered to the application will
+ * contain no rows (ntuples = 0). However, it still has
+ * PG::Result#result_status = PGRES_TUPLES_OK, and it contains correct
+ * information about the set of columns in the query result.
+ * On the other hand, if the query fails partway through, the returned
+ * PG::Result has result_status = PGRES_FATAL_ERROR. The application must
+ * be prepared to undo any actions of the row processor whenever it gets
+ * a PGRES_FATAL_ERROR result.
+ *
+ * This function takes a block, which should
+ * accept a two parameters that will be a PG::Result object and an Array of
+ * the values of the given row. The block should return nil in case of success
+ * and a error String in case of failure. In the error case, all remaining rows
+ * in the result set are discarded and then a PGRES_FATAL_ERROR PG::Result
+ * is returned to the application (containing the specified error message).
+ *
+ * If you pass no arguments, it will reinstall the standard row processor
+ * on the given connection.
+ *
+ * *Note:* The result object passed to the block should not be used outside
+ * of the block, since the corresponding C object could be freed after the
+ * block finishes.
+ *
+ */
+static VALUE
+pgconn_set_row_processor(VALUE self)
+{
+	VALUE proc;
+	PGconn *conn = pg_get_pgconn(self);
+
+	if( rb_block_given_p() ) {
+		proc = rb_block_proc();
+		PQsetRowProcessor(conn, row_processor_proxy, (void *)self);
+	} else {
+		/* if no block is given, set back to standard row processor */
+		proc = Qnil;
+		PQsetRowProcessor(conn, NULL, NULL);
+	}
+
+	rb_iv_set(self, "@row_processor", proc);
+	return Qnil;
+}
+
+/*
+ * call-seq:
+ *   conn.get_row_processor() -> Proc
+ *
+ * Returns the Proc object previously set by {set_row_processor},
+ * or +nil+ if it was previously the default.
+ */
+static VALUE
+pgconn_get_row_processor(VALUE self)
+{
+	VALUE proc;
+	PGconn *conn = pg_get_pgconn(self);
+	void *param;
+	PQrowProcessor row_processor = PQgetRowProcessor(conn, &param);
+
+	if ( row_processor == row_processor_proxy && param == (void *)self ) {
+		return rb_iv_get(self, "@row_processor");
+	}
+	return Qnil;
+}
+
+/*
+ * call-seq:
+ *   conn.skip_result() -> Result
+ *
+ * Discard all the remaining rows in the incoming result set.
+ *
+ * This is a simple convenience function to discard incoming data after
+ * a row processor has failed or it's determined that the rest of the
+ * result set is not interesting. +skip_result+ is exactly equivalent to
+ * {get_result} except that it transiently installs a dummy row processor
+ * function that just discards data.
+ * 
+ */
+static VALUE
+pgconn_skip_result(VALUE self)
+{
+	PGconn *conn = pg_get_pgconn(self);
+	PGresult *res = PQskipResult(conn);
+
+	return pg_new_result(res, self);
+}
+
+#endif
 
 void
 init_pg_connection()
 	rb_define_method(rb_cPGconn, "set_default_encoding", pgconn_set_default_encoding, 0);
 #endif /* M17N_SUPPORTED */
 
+#ifdef HAVE_PQSETROWPROCESSOR
+	/******     PG::Connection INSTANCE METHODS: Row Processing     ******/
+	rb_define_method(rb_cPGconn, "set_row_processor", pgconn_set_row_processor, 0);
+	rb_define_method(rb_cPGconn, "get_row_processor", pgconn_get_row_processor, 0);
+	rb_define_method(rb_cPGconn, "skip_result", pgconn_skip_result, 0);
+#endif
+
 }
 
 

File ext/pg_result.c

 VALUE
 pg_new_result(PGresult *result, VALUE rb_pgconn)
 {
+	return pg_new_result2(result, rb_pgconn, pgresult_gc_free);
+}
+
+VALUE
+pg_new_result_for_callback(PGresult *result, VALUE rb_pgconn)
+{
+	return pg_new_result2(result, rb_pgconn, NULL);
+}
+
+VALUE
+pg_new_result2(PGresult *result, VALUE rb_pgconn, void (*free)( PGresult *result ))
+{
 	PGconn *conn = pg_get_pgconn( rb_pgconn );
-	VALUE val = Data_Wrap_Struct(rb_cPGresult, NULL, pgresult_gc_free, result);
+	VALUE val = Data_Wrap_Struct(rb_cPGresult, NULL, free, result);
 #ifdef M17N_SUPPORTED
 	rb_encoding *enc = pg_conn_enc_get( conn );
 	ENCODING_SET( val, rb_enc_to_index(enc) );

File spec/lib/helpers.rb

 		PG::Connection.instance_methods.map( &:to_sym ).include?( :escape_literal )
 	config.filter_run_excluding :postgresql_91 unless
 		PG.respond_to?( :library_version )
+	config.filter_run_excluding :postgresql_92 unless
+		PG::Connection.instance_methods.map( &:to_sym ).include?( :set_row_processor )
 end
 

File spec/pg/connection_spec.rb

 	end
 
 	after( :each ) do
+		# Disable row processor
+		@conn.set_row_processor if @conn.respond_to?(:set_row_processor)
 		@conn.exec( 'ROLLBACK' ) unless example.metadata[:without_transaction]
 	end
 
 
 	end
 
+	context "under PostgreSQL 9.2 client library", :postgresql_92 do
+		it "can use set_row_processor" do
+			expected_rows = [nil, ['1','2'], ['2','3']]
+			@conn.set_row_processor do |result, row|
+				result.nfields.should == 2
+				row.should == expected_rows.shift
+				nil
+			end
+			res = @conn.exec( 'VALUES(1,2),(2,3)' )
+			res.ntuples.should == 0
+		end
+
+		it "can use set_row_processor with skip_results" do
+			expected_rows = [nil, ['1','2'], ['2','3'], nil, ['4','5']]
+			@conn.set_row_processor do |result, row|
+				row.should == expected_rows.shift
+				if row==['2','3']
+					res = @conn.skip_result
+					res.ntuples.should == 0
+				end
+				nil
+			end
+			@conn.exec( 'VALUES(1,2),(2,3),(3,4); VALUES(4,5)' )
+		end
+
+		it "should raise the error returned by the row processor" do
+			@conn.set_row_processor do |result, row|
+				"something wrong happened"
+			end
+			expect {
+				@conn.exec( 'VALUES(1,2),(2,3)' )
+			}.to raise_error( PG::Error, /something wrong happened/ )
+		end
+
+		it "should handle an exception within the row processor" do
+			@conn.set_row_processor do |result, row|
+				raise "something wrong happened"
+			end
+			expect {
+				@conn.exec( 'VALUES(1,2),(2,3)' )
+			}.to raise_error( StandardError, /something wrong happened/ )
+		end
+
+		it "can use get_row_processor" do
+			@conn.get_row_processor.should == nil
+			@conn.set_row_processor do |result, row|
+				"foobar"
+			end
+			@conn.get_row_processor.call(nil, nil).should == 'foobar'
+		end
+
+		context "multinationalization support", :ruby_19 do
+			it "row processor gets proper result encoding" do
+				@conn.set_row_processor do |result, row|
+					result.fname(0).encoding.should == Encoding::UTF_8
+					nil
+				end
+				res = @conn.exec( 'VALUES(1)' )
+			end
+		end
+	end
+	
 	context "multinationalization support", :ruby_19 do
 
 		it "should return the same bytes in text format that are sent as inline text" do