Commits

Heikki Linnakangas  committed 317dd55

Add SP-GiST support for range types.

The implementation is a quad-tree, largely copied from the quad-tree
implementation for points. The lower and upper bound of ranges are the 2d
coordinates, with some extra code to handle empty ranges.

I left out the support for adjacent operator, -|-, from the original patch.
Not because there was necessarily anything wrong with it, but it was more
complicated than the other operators, and I only have limited time for
reviewing. That will follow as a separate patch.

Alexander Korotkov, reviewed by Jeff Davis and me.

  • Participants
  • Parent commits 89911b3

Comments (0)

Files changed (15)

File src/backend/utils/adt/Makefile

 	tsginidx.o tsgistidx.o tsquery.o tsquery_cleanup.o tsquery_gist.o \
 	tsquery_op.o tsquery_rewrite.o tsquery_util.o tsrank.o \
 	tsvector.o tsvector_op.o tsvector_parser.o \
-	txid.o uuid.o windowfuncs.o xml.o
+	txid.o uuid.o windowfuncs.o xml.o rangetypes_spgist.o
 
 like.o: like.c like_match.c
 

File src/backend/utils/adt/rangetypes_gist.c

 #include "utils/rangetypes.h"
 
 
-/* Operator strategy numbers used in the GiST range opclass */
-/* Numbers are chosen to match up operator names with existing usages */
-#define RANGESTRAT_BEFORE				1
-#define RANGESTRAT_OVERLEFT				2
-#define RANGESTRAT_OVERLAPS				3
-#define RANGESTRAT_OVERRIGHT			4
-#define RANGESTRAT_AFTER				5
-#define RANGESTRAT_ADJACENT				6
-#define RANGESTRAT_CONTAINS				7
-#define RANGESTRAT_CONTAINED_BY			8
-#define RANGESTRAT_CONTAINS_ELEM		16
-#define RANGESTRAT_EQ					18
-
 /*
  * Range class properties used to segregate different classes of ranges in
  * GiST.  Each unique combination of properties is a class.  CLS_EMPTY cannot

File src/backend/utils/adt/rangetypes_spgist.c

+/*-------------------------------------------------------------------------
+ *
+ * rangetypes_spgist.c
+ *	  implementation of quad tree over ranges mapped to 2d-points for SP-GiST.
+ *
+ * Quad tree is a data structure similar to a binary tree, but is adapted to
+ * 2d data. Each inner node of a quad tree contains a point (centroid) which
+ * divides the 2d-space into 4 quadrants. Each quadrant is associated with a
+ * child node.
+ *
+ * Ranges are mapped to 2d-points so that the lower bound is one dimension,
+ * and the upper bound is another. By convention, we visualize the lower bound
+ * to be the horizontal axis, and upper bound the vertical axis.
+ *
+ * One quirk with this mapping is the handling of empty ranges. An empty range
+ * doesn't have lower and upper bounds, so it cannot be mapped to 2d space in
+ * a straightforward way. To cope with that, the root node can have a 5th
+ * quadrant, which is reserved for empty ranges. Furthermore, there can be
+ * inner nodes in the tree with no centroid. They contain only two child nodes,
+ * one for empty ranges and another for non-empty ones. Such a node can appear
+ * as the root node, or in the tree under the 5th child of the root node (in
+ * which case it will only contain empty nodes).
+ *
+ * The SP-GiST picksplit function uses medians along both axes as the centroid.
+ * This implementation only uses the comparison function of the range element
+ * datatype, therefore it works for any range type.
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *			src/backend/utils/adt/rangetypes_spgist.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/spgist.h"
+#include "access/skey.h"
+#include "catalog/pg_type.h"
+#include "utils/builtins.h"
+#include "utils/datum.h"
+#include "utils/rangetypes.h"
+
+/* SP-GiST API functions */
+Datum		spg_range_quad_config(PG_FUNCTION_ARGS);
+Datum		spg_range_quad_choose(PG_FUNCTION_ARGS);
+Datum		spg_range_quad_picksplit(PG_FUNCTION_ARGS);
+Datum		spg_range_quad_inner_consistent(PG_FUNCTION_ARGS);
+Datum		spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS);
+
+static int16 getQuadrant(TypeCacheEntry *typcache, RangeType *centroid,
+			RangeType *tst);
+static int	bound_cmp(const void *a, const void *b, void *arg);
+
+/*
+ * SP-GiST 'config' interface function.
+ */
+Datum
+spg_range_quad_config(PG_FUNCTION_ARGS)
+{
+	/* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
+	spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
+
+	cfg->prefixType = ANYRANGEOID;
+	cfg->labelType = VOIDOID;	/* we don't need node labels */
+	cfg->canReturnData = true;
+	cfg->longValuesOK = false;
+	PG_RETURN_VOID();
+}
+
+/*----------
+ * Determine which quadrant a 2d-mapped range falls into, relative to the
+ * centroid.
+ *
+ * Quadrants are numbered like this:
+ *
+ *	 4	|  1
+ *	----+----
+ *	 3	|  2
+ *
+ * Where the lower bound of range is the horizontal axis and upper bound the
+ * vertical axis.
+ *
+ * Ranges on one of the axes are taken to lie in the quadrant with higher value
+ * along perpendicular axis. That is, a value on the horizontal axis is taken
+ * to belong to quadrant 1 or 4, and a value on the vertical axis is taken to
+ * belong to quadrant 1 or 2. A range equal to centroid is taken to lie in
+ * quadrant 1.
+ *
+ * Empty ranges are taken to lie in the special quadrant 5.
+ *----------
+ */
+static int16
+getQuadrant(TypeCacheEntry *typcache, RangeType *centroid, RangeType *tst)
+{
+	RangeBound	centroidLower,
+				centroidUpper;
+	bool		centroidEmpty;
+	RangeBound	lower,
+				upper;
+	bool		empty;
+
+	range_deserialize(typcache, centroid, &centroidLower, &centroidUpper,
+					  &centroidEmpty);
+	range_deserialize(typcache, tst, &lower, &upper, &empty);
+
+	if (empty)
+		return 5;
+
+	if (range_cmp_bounds(typcache, &lower, &centroidLower) >= 0)
+	{
+		if (range_cmp_bounds(typcache, &upper, &centroidUpper) >= 0)
+			return 1;
+		else
+			return 2;
+	}
+	else
+	{
+		if (range_cmp_bounds(typcache, &upper, &centroidUpper) >= 0)
+			return 4;
+		else
+			return 3;
+	}
+}
+
+/*
+ * Choose SP-GiST function: choose path for addition of new range.
+ */
+Datum
+spg_range_quad_choose(PG_FUNCTION_ARGS)
+{
+	spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
+	spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
+	RangeType  *inRange = DatumGetRangeType(in->datum),
+			   *centroid;
+	int16		quadrant;
+	TypeCacheEntry *typcache;
+
+	if (in->allTheSame)
+	{
+		out->resultType = spgMatchNode;
+		/* nodeN will be set by core */
+		out->result.matchNode.levelAdd = 0;
+		out->result.matchNode.restDatum = RangeTypeGetDatum(inRange);
+		PG_RETURN_VOID();
+	}
+
+	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(inRange));
+
+	/*
+	 * A node with no centroid divides ranges purely on whether they're empty
+	 * or not. All empty ranges go to child node 0, all non-empty ranges go
+	 * to node 1.
+	 */
+	if (!in->hasPrefix)
+	{
+		out->resultType = spgMatchNode;
+		if (RangeIsEmpty(inRange))
+			out->result.matchNode.nodeN = 0;
+		else
+			out->result.matchNode.nodeN = 1;
+		out->result.matchNode.levelAdd = 1;
+		out->result.matchNode.restDatum = RangeTypeGetDatum(inRange);
+		PG_RETURN_VOID();
+	}
+
+	centroid = DatumGetRangeType(in->prefixDatum);
+	quadrant = getQuadrant(typcache, centroid, inRange);
+
+	Assert(quadrant <= in->nNodes);
+
+	/* Select node matching to quadrant number */
+	out->resultType = spgMatchNode;
+	out->result.matchNode.nodeN = quadrant - 1;
+	out->result.matchNode.levelAdd = 1;
+	out->result.matchNode.restDatum = RangeTypeGetDatum(inRange);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Bound comparison for sorting.
+ */
+static int
+bound_cmp(const void *a, const void *b, void *arg)
+{
+	RangeBound *ba = (RangeBound *) a;
+	RangeBound *bb = (RangeBound *) b;
+	TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
+
+	return range_cmp_bounds(typcache, ba, bb);
+}
+
+/*
+ * Picksplit SP-GiST function: split ranges into nodes. Select "centroid"
+ * range and distribute ranges according to quadrants.
+ */
+Datum
+spg_range_quad_picksplit(PG_FUNCTION_ARGS)
+{
+	spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
+	spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
+	int			i;
+	int			j;
+	int			nonEmptyCount;
+	RangeType  *centroid;
+	bool		empty;
+	TypeCacheEntry *typcache;
+
+	/* Use the median values of lower and upper bounds as the centroid range */
+	RangeBound *lowerBounds,
+			   *upperBounds;
+
+	typcache = range_get_typcache(fcinfo,
+						  RangeTypeGetOid(DatumGetRangeType(in->datums[0])));
+
+	/* Allocate memory for bounds */
+	lowerBounds = palloc(sizeof(RangeBound) * in->nTuples);
+	upperBounds = palloc(sizeof(RangeBound) * in->nTuples);
+	j = 0;
+
+	/* Deserialize bounds of ranges, count non-empty ranges */
+	for (i = 0; i < in->nTuples; i++)
+	{
+		range_deserialize(typcache, DatumGetRangeType(in->datums[i]),
+						  &lowerBounds[j], &upperBounds[j], &empty);
+		if (!empty)
+			j++;
+	}
+	nonEmptyCount = j;
+
+	/*
+	 * All the ranges are empty. The best we can do is to construct an inner
+	 * node with no centroid, and put all ranges into node 0. If non-empty
+	 * ranges are added later, they will be routed to node 1.
+	 */
+	if (nonEmptyCount == 0)
+	{
+		out->nNodes = 2;
+		out->hasPrefix = false;
+		/* Prefix is empty */
+		out->prefixDatum = PointerGetDatum(NULL);
+		out->nodeLabels = NULL;
+
+		out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
+		out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
+
+		/* Place all ranges into node 0 */
+		for (i = 0; i < in->nTuples; i++)
+		{
+			RangeType  *range = DatumGetRangeType(in->datums[i]);
+
+			out->leafTupleDatums[i] = RangeTypeGetDatum(range);
+			out->mapTuplesToNodes[i] = 0;
+		}
+		PG_RETURN_VOID();
+	}
+
+	/* Sort range bounds in order to find medians */
+	qsort_arg(lowerBounds, nonEmptyCount, sizeof(RangeBound),
+			  bound_cmp, typcache);
+	qsort_arg(upperBounds, nonEmptyCount, sizeof(RangeBound),
+			  bound_cmp, typcache);
+
+	/* Construct "centroid" range from medians of lower and upper bounds */
+	centroid = range_serialize(typcache, &lowerBounds[nonEmptyCount / 2],
+							   &upperBounds[nonEmptyCount / 2], false);
+	out->hasPrefix = true;
+	out->prefixDatum = RangeTypeGetDatum(centroid);
+
+	/* Create node for empty ranges only if it is a root node */
+	out->nNodes = (in->level == 0) ? 5 : 4;
+	out->nodeLabels = NULL;		/* we don't need node labels */
+
+	out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
+	out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
+
+	/*
+	 * Assign ranges to corresponding nodes according to quadrants relative to
+	 * "centroid" range.
+	 */
+	for (i = 0; i < in->nTuples; i++)
+	{
+		RangeType  *range = DatumGetRangeType(in->datums[i]);
+		int16		quadrant = getQuadrant(typcache, centroid, range);
+
+		out->leafTupleDatums[i] = RangeTypeGetDatum(range);
+		out->mapTuplesToNodes[i] = quadrant - 1;
+	}
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * SP-GiST consistent function for inner nodes: check which nodes are
+ * consistent with given set of queries.
+ */
+Datum
+spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
+{
+	spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
+	spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
+	int			which;
+	int			i;
+
+	if (in->allTheSame)
+	{
+		/* Report that all nodes should be visited */
+		out->nNodes = in->nNodes;
+		out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
+		for (i = 0; i < in->nNodes; i++)
+			out->nodeNumbers[i] = i;
+		PG_RETURN_VOID();
+	}
+
+	if (!in->hasPrefix)
+	{
+		/*
+		 * No centroid on this inner node. Such a node has two child nodes,
+		 * the first for empty ranges, and the second for non-empty ones.
+		 */
+		Assert(in->nNodes == 2);
+
+		/*
+		 * Nth bit of which variable means that (N - 1)th node should be
+		 * visited. Initially all bits are set. Bits of nodes which should be
+		 * skipped will be unset.
+		 */
+		which = (1 << 1) | (1 << 2);
+		for (i = 0; i < in->nkeys; i++)
+		{
+			StrategyNumber strategy = in->scankeys[i].sk_strategy;
+			bool		empty;
+
+			/*
+			 * The only strategy when second argument of operator is not range
+			 * is RANGESTRAT_CONTAINS_ELEM.
+			 */
+			if (strategy != RANGESTRAT_CONTAINS_ELEM)
+				empty = RangeIsEmpty(
+							 DatumGetRangeType(in->scankeys[i].sk_argument));
+			else
+				empty = false;
+
+			switch (strategy)
+			{
+				case RANGESTRAT_BEFORE:
+				case RANGESTRAT_OVERLEFT:
+				case RANGESTRAT_OVERLAPS:
+				case RANGESTRAT_OVERRIGHT:
+				case RANGESTRAT_AFTER:
+					/* These strategies return false if any argument is empty */
+					if (empty)
+						which = 0;
+					else
+						which &= (1 << 2);
+					break;
+
+				case RANGESTRAT_CONTAINS:
+					/*
+					 * All ranges contain an empty range. Only non-empty ranges
+					 * can contain a non-empty range.
+					 */
+					if (!empty)
+						which &= (1 << 2);
+					break;
+
+				case RANGESTRAT_CONTAINED_BY:
+					/*
+					 * Only an empty range is contained by an empty range. Both
+					 * empty and non-empty ranges can be contained by a
+					 * non-empty range.
+					 */
+					if (empty)
+						which &= (1 << 1);
+					break;
+
+				case RANGESTRAT_CONTAINS_ELEM:
+					which &= (1 << 2);
+					break;
+
+				case RANGESTRAT_EQ:
+					if (empty)
+						which &= (1 << 1);
+					else
+						which &= (1 << 2);
+					break;
+
+				default:
+					elog(ERROR, "unrecognized range strategy: %d", strategy);
+					break;
+			}
+			if (which == 0)
+				break;			/* no need to consider remaining conditions */
+		}
+	}
+	else
+	{
+		RangeBound	centroidLower,
+					centroidUpper;
+		bool		centroidEmpty;
+		TypeCacheEntry *typcache;
+		RangeType  *centroid;
+
+		/* This node has a centroid. Fetch it. */
+		centroid = DatumGetRangeType(in->prefixDatum);
+		typcache = range_get_typcache(fcinfo,
+							   RangeTypeGetOid(DatumGetRangeType(centroid)));
+		range_deserialize(typcache, centroid, &centroidLower, &centroidUpper,
+						  &centroidEmpty);
+
+		Assert(in->nNodes == 4 || in->nNodes == 5);
+
+		/*
+		 * Nth bit of which variable means that (N - 1)th node (Nth quadrant)
+		 * should be visited. Initially all bits are set. Bits of nodes which
+		 * can be skipped will be unset.
+		 */
+		which = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
+
+		for (i = 0; i < in->nkeys; i++)
+		{
+			StrategyNumber strategy;
+			RangeBound	lower,
+						upper;
+			bool		empty;
+			RangeType  *range;
+
+			/* Restrictions on range bounds according to scan strategy */
+			RangeBound *minLower = NULL,
+					   *maxLower = NULL,
+					   *minUpper = NULL,
+					   *maxUpper = NULL;
+			/* Are the restrictions on range bounds inclusive? */
+			bool		inclusive = true;
+			bool		strictEmpty = true;
+
+			strategy = in->scankeys[i].sk_strategy;
+
+			/*
+			 * RANGESTRAT_CONTAINS_ELEM is just like RANGESTRAT_CONTAINS, but
+			 * the argument is a single element. Expand the single element to
+			 * a range containing only the element, and treat it like
+			 * RANGESTRAT_CONTAINS.
+			 */
+			if (strategy == RANGESTRAT_CONTAINS_ELEM)
+			{
+				lower.inclusive = true;
+				lower.infinite = false;
+				lower.lower = true;
+				lower.val = in->scankeys[i].sk_argument;
+
+				upper.inclusive = true;
+				upper.infinite = false;
+				upper.lower = false;
+				upper.val = in->scankeys[i].sk_argument;
+
+				empty = false;
+
+				strategy = RANGESTRAT_CONTAINS;
+			}
+			else
+			{
+				range = DatumGetRangeType(in->scankeys[i].sk_argument);
+				range_deserialize(typcache, range, &lower, &upper, &empty);
+			}
+
+			/*
+			 * Most strategies are handled by forming a bounding box from the
+			 * search key, defined by a minLower, maxLower, minUpper, maxUpper.
+			 * Some modify 'which' directly, to specify exactly which quadrants
+			 * need to be visited.
+			 *
+			 * For most strategies, nothing matches an empty search key, and
+			 * an empty range never matches a non-empty key. If a strategy
+			 * does not behave like that wrt. empty ranges, set strictEmpty to
+			 * false.
+			 */
+			switch (strategy)
+			{
+				case RANGESTRAT_BEFORE:
+					/*
+					 * Range A is before range B if upper bound of A is lower
+					 * than lower bound of B.
+					 */
+					maxUpper = &lower;
+					inclusive = false;
+					break;
+
+				case RANGESTRAT_OVERLEFT:
+					/*
+					 * Range A is overleft to range B if upper bound of A is
+					 * less or equal to upper bound of B.
+					 */
+					maxUpper = &upper;
+					break;
+
+				case RANGESTRAT_OVERLAPS:
+					/*
+					 * Non-empty ranges overlap, if lower bound of each range
+					 * is lower or equal to upper bound of the other range.
+					 */
+					maxLower = &upper;
+					minUpper = &lower;
+					break;
+
+				case RANGESTRAT_OVERRIGHT:
+					/*
+					 * Range A is overright to range B if lower bound of A is
+					 * greater or equal to lower bound of B.
+					 */
+					minLower = &lower;
+					break;
+
+				case RANGESTRAT_AFTER:
+					/*
+					 * Range A is after range B if lower bound of A is greater
+					 * than upper bound of B.
+					 */
+					minLower = &upper;
+					inclusive = false;
+					break;
+
+				case RANGESTRAT_CONTAINS:
+					/*
+					 * Non-empty range A contains non-empty range B if lower
+					 * bound of A is lower or equal to lower bound of range B
+					 * and upper bound of range A is greater or equal to upper
+					 * bound of range A.
+					 *
+					 * All non-empty ranges contain an empty range.
+					 */
+					strictEmpty = false;
+					if (!empty)
+					{
+						which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
+						maxLower = &lower;
+						minUpper = &upper;
+					}
+					break;
+
+				case RANGESTRAT_CONTAINED_BY:
+					/* The opposite of contains. */
+					strictEmpty = false;
+					if (empty)
+					{
+						/* An empty range is only contained by an empty range */
+						which &= (1 << 5);
+					}
+					else
+					{
+						minLower = &lower;
+						maxUpper = &upper;
+					}
+					break;
+
+				case RANGESTRAT_EQ:
+					/*
+					 * Equal range can be only in the same quadrant where
+					 * argument would be placed to.
+					 */
+					strictEmpty = false;
+					which &= (1 << getQuadrant(typcache, centroid, range));
+					break;
+
+				default:
+					elog(ERROR, "unrecognized range strategy: %d", strategy);
+					break;
+			}
+
+			if (strictEmpty)
+			{
+				if (empty)
+				{
+					/* Scan key is empty, no branches are satisfying */
+					which = 0;
+					break;
+				}
+				else
+				{
+					/* Shouldn't visit tree branch with empty ranges */
+					which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
+				}
+			}
+
+			/*
+			 * Using the bounding box, see which quadrants we have to descend
+			 * into.
+			 */
+			if (minLower)
+			{
+				/*
+				 * If the centroid's lower bound is less than or equal to
+				 * the minimum lower bound, anything in the 3rd and 4th
+				 * quadrants will have an even smaller lower bound, and thus
+				 * can't match.
+				 */
+				if (range_cmp_bounds(typcache, &centroidLower, minLower) <= 0)
+					which &= (1 << 1) | (1 << 2) | (1 << 5);
+			}
+			if (maxLower)
+			{
+				/*
+				 * If the centroid's lower bound is greater than the maximum
+				 * lower bound, anything in the 1st and 2nd quadrants will
+				 * also have a greater than or equal lower bound, and thus
+				 * can't match. If the centroid's lower bound is equal to
+				 * the maximum lower bound, we can still exclude the 1st and
+				 * 2nd quadrants if we're looking for a value strictly greater
+				 * than the maximum.
+				 */
+				int			cmp;
+
+				cmp = range_cmp_bounds(typcache, &centroidLower, maxLower);
+				if (cmp > 0 || (!inclusive && cmp == 0))
+					which &= (1 << 3) | (1 << 4) | (1 << 5);
+			}
+			if (minUpper)
+			{
+				/*
+				 * If the centroid's upper bound is less than or equal to
+				 * the minimum upper bound, anything in the 2nd and 3rd
+				 * quadrants will have an even smaller upper bound, and thus
+				 * can't match.
+				 */
+				if (range_cmp_bounds(typcache, &centroidUpper, minUpper) <= 0)
+					which &= (1 << 1) | (1 << 4) | (1 << 5);
+			}
+			if (maxUpper)
+			{
+				/*
+				 * If the centroid's upper bound is greater than the maximum
+				 * upper bound, anything in the 1st and 4th quadrants will
+				 * also have a greater than or equal upper bound, and thus
+				 * can't match. If the centroid's upper bound is equal to
+				 * the maximum upper bound, we can still exclude the 1st and
+				 * 4th quadrants if we're looking for a value strictly greater
+				 * than the maximum.
+				 */
+				int			cmp;
+
+				cmp = range_cmp_bounds(typcache, &centroidUpper, maxUpper);
+				if (cmp > 0 || (!inclusive && cmp == 0))
+					which &= (1 << 2) | (1 << 3) | (1 << 5);
+			}
+
+			if (which == 0)
+				break;			/* no need to consider remaining conditions */
+		}
+	}
+
+	/* We must descend into the quadrant(s) identified by 'which' */
+	out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
+	out->nNodes = 0;
+	for (i = 1; i <= in->nNodes; i++)
+	{
+		if (which & (1 << i))
+			out->nodeNumbers[out->nNodes++] = i - 1;
+	}
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * SP-GiST consistent function for leaf nodes: check leaf value against query
+ * using corresponding function.
+ */
+Datum
+spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)
+{
+	spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
+	spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
+	RangeType  *leafRange = DatumGetRangeType(in->leafDatum);
+	TypeCacheEntry *typcache;
+	bool		res;
+	int			i;
+
+	/* all tests are exact */
+	out->recheck = false;
+
+	/* leafDatum is what it is... */
+	out->leafValue = in->leafDatum;
+
+	typcache = range_get_typcache(fcinfo, RangeTypeGetOid(leafRange));
+
+	/* Perform the required comparison(s) */
+	res = true;
+	for (i = 0; i < in->nkeys; i++)
+	{
+		Datum		keyDatum = in->scankeys[i].sk_argument;
+
+		/* Call the function corresponding to the scan strategy */
+		switch (in->scankeys[i].sk_strategy)
+		{
+			case RANGESTRAT_BEFORE:
+				res = range_before_internal(typcache, leafRange,
+											DatumGetRangeType(keyDatum));
+				break;
+			case RANGESTRAT_OVERLEFT:
+				res = range_overleft_internal(typcache, leafRange,
+											  DatumGetRangeType(keyDatum));
+				break;
+			case RANGESTRAT_OVERLAPS:
+				res = range_overlaps_internal(typcache, leafRange,
+											  DatumGetRangeType(keyDatum));
+				break;
+			case RANGESTRAT_OVERRIGHT:
+				res = range_overright_internal(typcache, leafRange,
+											   DatumGetRangeType(keyDatum));
+				break;
+			case RANGESTRAT_AFTER:
+				res = range_after_internal(typcache, leafRange,
+										   DatumGetRangeType(keyDatum));
+				break;
+			case RANGESTRAT_CONTAINS:
+				res = range_contains_internal(typcache, leafRange,
+											  DatumGetRangeType(keyDatum));
+				break;
+			case RANGESTRAT_CONTAINED_BY:
+				res = range_contained_by_internal(typcache, leafRange,
+												DatumGetRangeType(keyDatum));
+				break;
+			case RANGESTRAT_CONTAINS_ELEM:
+				res = range_contains_elem_internal(typcache, leafRange,
+												   keyDatum);
+				break;
+			case RANGESTRAT_EQ:
+				res = range_eq_internal(typcache, leafRange,
+										DatumGetRangeType(keyDatum));
+				break;
+			default:
+				elog(ERROR, "unrecognized range strategy: %d",
+					 in->scankeys[i].sk_strategy);
+				break;
+		}
+
+		/*
+		 * If leaf datum doesn't match to a query key, no need to check
+		 * subsequent keys.
+		 */
+		if (!res)
+			break;
+	}
+
+	PG_RETURN_BOOL(res);
+}

File src/include/catalog/catversion.h

  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	201208071
+#define CATALOG_VERSION_NO	201208161
 
 #endif

File src/include/catalog/pg_amop.h

 DATA(insert (	4017   25 25 14 s	667 4000 0 ));
 DATA(insert (	4017   25 25 15 s	666 4000 0 ));
 
+/*
+ * SP-GiST range_ops
+ */
+DATA(insert (	3474   3831 3831 1 s	3893 4000 0 ));
+DATA(insert (	3474   3831 3831 2 s	3895 4000 0 ));
+DATA(insert (	3474   3831 3831 3 s	3888 4000 0 ));
+DATA(insert (	3474   3831 3831 4 s	3896 4000 0 ));
+DATA(insert (	3474   3831 3831 5 s	3894 4000 0 ));
+DATA(insert (	3474   3831 3831 7 s	3890 4000 0 ));
+DATA(insert (	3474   3831 3831 8 s	3892 4000 0 ));
+DATA(insert (	3474   3831 2283 16 s	3889 4000 0 ));
+DATA(insert (	3474   3831 3831 18 s	3882 4000 0 ));
+
 #endif   /* PG_AMOP_H */

File src/include/catalog/pg_amproc.h

 DATA(insert (	4017   25 25 3 4029 ));
 DATA(insert (	4017   25 25 4 4030 ));
 DATA(insert (	4017   25 25 5 4031 ));
+DATA(insert (	3474   3831 3831 1 3469 ));
+DATA(insert (	3474   3831 3831 2 3470 ));
+DATA(insert (	3474   3831 3831 3 3471 ));
+DATA(insert (	3474   3831 3831 4 3472 ));
+DATA(insert (	3474   3831 3831 5 3473 ));
 
 #endif   /* PG_AMPROC_H */

File src/include/catalog/pg_opclass.h

 DATA(insert (	403		range_ops			PGNSP PGUID 3901  3831 t 0 ));
 DATA(insert (	405		range_ops			PGNSP PGUID 3903  3831 t 0 ));
 DATA(insert (	783		range_ops			PGNSP PGUID 3919  3831 t 0 ));
+DATA(insert (	4000	range_ops			PGNSP PGUID 3474  3831 t 0 ));
 DATA(insert (	4000	quad_point_ops		PGNSP PGUID 4015  600 t 0 ));
 DATA(insert (	4000	kd_point_ops		PGNSP PGUID 4016  600 f 0 ));
 DATA(insert (	4000	text_ops			PGNSP PGUID 4017  25 t 0 ));

File src/include/catalog/pg_opfamily.h

 DATA(insert OID = 3901 (	403		range_ops		PGNSP PGUID ));
 DATA(insert OID = 3903 (	405		range_ops		PGNSP PGUID ));
 DATA(insert OID = 3919 (	783		range_ops		PGNSP PGUID ));
+DATA(insert OID = 3474 (	4000	range_ops		PGNSP PGUID ));
 DATA(insert OID = 4015 (	4000	quad_point_ops	PGNSP PGUID ));
 DATA(insert OID = 4016 (	4000	kd_point_ops	PGNSP PGUID ));
 DATA(insert OID = 4017 (	4000	text_ops		PGNSP PGUID ));

File src/include/catalog/pg_proc.h

 DATA(insert OID = 4031 (  spg_text_leaf_consistent	PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "2281 2281" _null_ _null_ _null_ _null_  spg_text_leaf_consistent _null_ _null_ _null_ ));
 DESCR("SP-GiST support for suffix tree over text");
 
+DATA(insert OID = 3469 (  spg_range_quad_config	PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  spg_range_quad_config _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over range");
+DATA(insert OID = 3470 (  spg_range_quad_choose	PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  spg_range_quad_choose _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over range");
+DATA(insert OID = 3471 (  spg_range_quad_picksplit	PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  spg_range_quad_picksplit _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over range");
+DATA(insert OID = 3472 (  spg_range_quad_inner_consistent	PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  spg_range_quad_inner_consistent _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over range");
+DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "2281 2281" _null_ _null_ _null_ _null_  spg_range_quad_leaf_consistent _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over range");
+
 
 /*
  * Symbolic values for provolatile column: these indicate whether the result

File src/include/utils/rangetypes.h

 #define PG_GETARG_RANGE_COPY(n)		DatumGetRangeTypeCopy(PG_GETARG_DATUM(n))
 #define PG_RETURN_RANGE(x)			return RangeTypeGetDatum(x)
 
+/* Operator strategy numbers used in the GiST and SP-GiST range opclasses */
+/* Numbers are chosen to match up operator names with existing usages */
+#define RANGESTRAT_BEFORE				1
+#define RANGESTRAT_OVERLEFT				2
+#define RANGESTRAT_OVERLAPS				3
+#define RANGESTRAT_OVERRIGHT			4
+#define RANGESTRAT_AFTER				5
+#define RANGESTRAT_ADJACENT				6
+#define RANGESTRAT_CONTAINS				7
+#define RANGESTRAT_CONTAINED_BY			8
+#define RANGESTRAT_CONTAINS_ELEM		16
+#define RANGESTRAT_EQ					18
+
 /*
  * prototypes for functions defined in rangetypes.c
  */

File src/test/regress/expected/opr_sanity.out

        2742 |            4 | =
        4000 |            1 | <<
        4000 |            1 | ~<~
+       4000 |            2 | &<
        4000 |            2 | ~<=~
+       4000 |            3 | &&
        4000 |            3 | =
+       4000 |            4 | &>
        4000 |            4 | ~>=~
        4000 |            5 | >>
        4000 |            5 | ~>~
        4000 |            6 | ~=
+       4000 |            7 | @>
        4000 |            8 | <@
        4000 |           10 | <^
        4000 |           11 | <
        4000 |           12 | <=
        4000 |           14 | >=
        4000 |           15 | >
-(55 rows)
+       4000 |           16 | @>
+       4000 |           18 | =
+(61 rows)
 
 -- Check that all opclass search operators have selectivity estimators.
 -- This is not absolutely required, but it seems a reasonable thing

File src/test/regress/expected/rangetypes.out

      5
 (1 row)
 
+-- test SP-GiST index that's been built incrementally
+create table test_range_spgist(ir int4range);
+create index test_range_spgist_idx on test_range_spgist using spgist (ir);
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1,2000) g;
+insert into test_range_spgist select 'empty'::int4range from generate_series(1,500) g;
+insert into test_range_spgist select int4range(g, g+10000) from generate_series(1,1000) g;
+insert into test_range_spgist select 'empty'::int4range from generate_series(1,500) g;
+insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1,100) g;
+insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1,100) g;
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1,2000) g;
+-- first, verify non-indexed results
+SET enable_seqscan    = t;
+SET enable_indexscan  = f;
+SET enable_bitmapscan = f;
+select count(*) from test_range_spgist where ir @> 'empty'::int4range;
+ count 
+-------
+  6200
+(1 row)
+
+select count(*) from test_range_spgist where ir = int4range(10,20);
+ count 
+-------
+     2
+(1 row)
+
+select count(*) from test_range_spgist where ir @> 10;
+ count 
+-------
+   130
+(1 row)
+
+select count(*) from test_range_spgist where ir @> int4range(10,20);
+ count 
+-------
+   111
+(1 row)
+
+select count(*) from test_range_spgist where ir && int4range(10,20);
+ count 
+-------
+   158
+(1 row)
+
+select count(*) from test_range_spgist where ir <@ int4range(10,50);
+ count 
+-------
+  1062
+(1 row)
+
+select count(*) from test_range_spgist where ir << int4range(100,500);
+ count 
+-------
+   189
+(1 row)
+
+select count(*) from test_range_spgist where ir >> int4range(100,500);
+ count 
+-------
+  3554
+(1 row)
+
+select count(*) from test_range_spgist where ir &< int4range(100,500);
+ count 
+-------
+  1029
+(1 row)
+
+select count(*) from test_range_spgist where ir &> int4range(100,500);
+ count 
+-------
+  4794
+(1 row)
+
+select count(*) from test_range_spgist where ir -|- int4range(100,500);
+ count 
+-------
+     5
+(1 row)
+
+-- now check same queries using index
+SET enable_seqscan    = f;
+SET enable_indexscan  = t;
+SET enable_bitmapscan = f;
+select count(*) from test_range_spgist where ir @> 'empty'::int4range;
+ count 
+-------
+  6200
+(1 row)
+
+select count(*) from test_range_spgist where ir = int4range(10,20);
+ count 
+-------
+     2
+(1 row)
+
+select count(*) from test_range_spgist where ir @> 10;
+ count 
+-------
+   130
+(1 row)
+
+select count(*) from test_range_spgist where ir @> int4range(10,20);
+ count 
+-------
+   111
+(1 row)
+
+select count(*) from test_range_spgist where ir && int4range(10,20);
+ count 
+-------
+   158
+(1 row)
+
+select count(*) from test_range_spgist where ir <@ int4range(10,50);
+ count 
+-------
+  1062
+(1 row)
+
+select count(*) from test_range_spgist where ir << int4range(100,500);
+ count 
+-------
+   189
+(1 row)
+
+select count(*) from test_range_spgist where ir >> int4range(100,500);
+ count 
+-------
+  3554
+(1 row)
+
+select count(*) from test_range_spgist where ir &< int4range(100,500);
+ count 
+-------
+  1029
+(1 row)
+
+select count(*) from test_range_spgist where ir &> int4range(100,500);
+ count 
+-------
+  4794
+(1 row)
+
+select count(*) from test_range_spgist where ir -|- int4range(100,500);
+ count 
+-------
+     5
+(1 row)
+
+-- now check same queries using a bulk-loaded index
+drop index test_range_spgist_idx;
+create index test_range_spgist_idx on test_range_spgist using spgist (ir);
+select count(*) from test_range_spgist where ir @> 'empty'::int4range;
+ count 
+-------
+  6200
+(1 row)
+
+select count(*) from test_range_spgist where ir = int4range(10,20);
+ count 
+-------
+     2
+(1 row)
+
+select count(*) from test_range_spgist where ir @> 10;
+ count 
+-------
+   130
+(1 row)
+
+select count(*) from test_range_spgist where ir @> int4range(10,20);
+ count 
+-------
+   111
+(1 row)
+
+select count(*) from test_range_spgist where ir && int4range(10,20);
+ count 
+-------
+   158
+(1 row)
+
+select count(*) from test_range_spgist where ir <@ int4range(10,50);
+ count 
+-------
+  1062
+(1 row)
+
+select count(*) from test_range_spgist where ir << int4range(100,500);
+ count 
+-------
+   189
+(1 row)
+
+select count(*) from test_range_spgist where ir >> int4range(100,500);
+ count 
+-------
+  3554
+(1 row)
+
+select count(*) from test_range_spgist where ir &< int4range(100,500);
+ count 
+-------
+  1029
+(1 row)
+
+select count(*) from test_range_spgist where ir &> int4range(100,500);
+ count 
+-------
+  4794
+(1 row)
+
+select count(*) from test_range_spgist where ir -|- int4range(100,500);
+ count 
+-------
+     5
+(1 row)
+
 RESET enable_seqscan;
 RESET enable_indexscan;
 RESET enable_bitmapscan;

File src/test/regress/expected/sanity_check.out

  tenk2                   | t
  test_range_excl         | t
  test_range_gist         | t
+ test_range_spgist       | t
  test_tsvector           | f
  text_tbl                | f
  time_tbl                | f
  timetz_tbl              | f
  tinterval_tbl           | f
  varchar_tbl             | f
-(154 rows)
+(155 rows)
 
 --
 -- another sanity check: every system catalog that has OIDs should have

File src/test/regress/output/misc.source

  tenk2
  test_range_excl
  test_range_gist
+ test_range_spgist
  test_tsvector
  text_tbl
  time_tbl
  toyemp
  varchar_tbl
  xacttest
-(107 rows)
+(108 rows)
 
 SELECT name(equipment(hobby_construct(text 'skywalking', text 'mer')));
  name 

File src/test/regress/sql/rangetypes.sql

 select count(*) from test_range_gist where ir &> int4range(100,500);
 select count(*) from test_range_gist where ir -|- int4range(100,500);
 
+-- test SP-GiST index that's been built incrementally
+create table test_range_spgist(ir int4range);
+create index test_range_spgist_idx on test_range_spgist using spgist (ir);
+
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1,2000) g;
+insert into test_range_spgist select 'empty'::int4range from generate_series(1,500) g;
+insert into test_range_spgist select int4range(g, g+10000) from generate_series(1,1000) g;
+insert into test_range_spgist select 'empty'::int4range from generate_series(1,500) g;
+insert into test_range_spgist select int4range(NULL,g*10,'(]') from generate_series(1,100) g;
+insert into test_range_spgist select int4range(g*10,NULL,'(]') from generate_series(1,100) g;
+insert into test_range_spgist select int4range(g, g+10) from generate_series(1,2000) g;
+
+-- first, verify non-indexed results
+SET enable_seqscan    = t;
+SET enable_indexscan  = f;
+SET enable_bitmapscan = f;
+
+select count(*) from test_range_spgist where ir @> 'empty'::int4range;
+select count(*) from test_range_spgist where ir = int4range(10,20);
+select count(*) from test_range_spgist where ir @> 10;
+select count(*) from test_range_spgist where ir @> int4range(10,20);
+select count(*) from test_range_spgist where ir && int4range(10,20);
+select count(*) from test_range_spgist where ir <@ int4range(10,50);
+select count(*) from test_range_spgist where ir << int4range(100,500);
+select count(*) from test_range_spgist where ir >> int4range(100,500);
+select count(*) from test_range_spgist where ir &< int4range(100,500);
+select count(*) from test_range_spgist where ir &> int4range(100,500);
+select count(*) from test_range_spgist where ir -|- int4range(100,500);
+
+-- now check same queries using index
+SET enable_seqscan    = f;
+SET enable_indexscan  = t;
+SET enable_bitmapscan = f;
+
+select count(*) from test_range_spgist where ir @> 'empty'::int4range;
+select count(*) from test_range_spgist where ir = int4range(10,20);
+select count(*) from test_range_spgist where ir @> 10;
+select count(*) from test_range_spgist where ir @> int4range(10,20);
+select count(*) from test_range_spgist where ir && int4range(10,20);
+select count(*) from test_range_spgist where ir <@ int4range(10,50);
+select count(*) from test_range_spgist where ir << int4range(100,500);
+select count(*) from test_range_spgist where ir >> int4range(100,500);
+select count(*) from test_range_spgist where ir &< int4range(100,500);
+select count(*) from test_range_spgist where ir &> int4range(100,500);
+select count(*) from test_range_spgist where ir -|- int4range(100,500);
+
+-- now check same queries using a bulk-loaded index
+drop index test_range_spgist_idx;
+create index test_range_spgist_idx on test_range_spgist using spgist (ir);
+
+select count(*) from test_range_spgist where ir @> 'empty'::int4range;
+select count(*) from test_range_spgist where ir = int4range(10,20);
+select count(*) from test_range_spgist where ir @> 10;
+select count(*) from test_range_spgist where ir @> int4range(10,20);
+select count(*) from test_range_spgist where ir && int4range(10,20);
+select count(*) from test_range_spgist where ir <@ int4range(10,50);
+select count(*) from test_range_spgist where ir << int4range(100,500);
+select count(*) from test_range_spgist where ir >> int4range(100,500);
+select count(*) from test_range_spgist where ir &< int4range(100,500);
+select count(*) from test_range_spgist where ir &> int4range(100,500);
+select count(*) from test_range_spgist where ir -|- int4range(100,500);
+
 RESET enable_seqscan;
 RESET enable_indexscan;
 RESET enable_bitmapscan;