From 80da9e68fdd70b796b3a7de3821589513596c0f7 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sun, 4 Mar 2012 22:50:06 -0500 Subject: [PATCH] Rewrite GiST support code for rangetypes. This patch installs significantly smarter penalty and picksplit functions for ranges, making GiST indexes for them smaller and faster to search. There is no on-disk format change, so no catversion bump, but you'd need to REINDEX to get the benefits for any existing index. Alexander Korotkov, reviewed by Jeff Davis --- src/backend/utils/adt/rangetypes_gist.c | 1309 +++++++++++++++++++---- 1 file changed, 1110 insertions(+), 199 deletions(-) diff --git a/src/backend/utils/adt/rangetypes_gist.c b/src/backend/utils/adt/rangetypes_gist.c index 4267dc8cb6..87f71e6812 100644 --- a/src/backend/utils/adt/rangetypes_gist.c +++ b/src/backend/utils/adt/rangetypes_gist.c @@ -34,21 +34,118 @@ #define RANGESTRAT_CONTAINS_ELEM 16 #define RANGESTRAT_EQ 18 +/* + * Range class properties used to segregate different classes of ranges in + * GiST. Each unique combination of properties is a class. CLS_EMPTY cannot + * be combined with anything else. + */ +#define CLS_NORMAL 0 /* Ordinary finite range (no bits set) */ +#define CLS_LOWER_INF 1 /* Lower bound is infinity */ +#define CLS_UPPER_INF 2 /* Upper bound is infinity */ +#define CLS_CONTAIN_EMPTY 4 /* Contains underlying empty ranges */ +#define CLS_EMPTY 8 /* Special class for empty ranges */ + +#define CLS_COUNT 9 /* # of classes; includes all combinations of + * properties. CLS_EMPTY doesn't combine with + * anything else, so it's only 2^3 + 1. */ + +/* + * Minimum accepted ratio of split for items of the same class. If the items + * are of different classes, we will separate along those lines regardless of + * the ratio. + */ +#define LIMIT_RATIO 0.3 + +/* Constants for fixed penalty values */ +#define INFINITE_BOUND_PENALTY 2.0 +#define CONTAIN_EMPTY_PENALTY 1.0 +#define DEFAULT_SUBTYPE_DIFF_PENALTY 1.0 + +/* + * Per-item data for range_gist_single_sorting_split. + */ +typedef struct +{ + int index; + RangeBound bound; +} SingleBoundSortItem; + +/* place on left or right side of split? */ +typedef enum +{ + SPLIT_LEFT = 0, /* makes initialization to SPLIT_LEFT easier */ + SPLIT_RIGHT +} SplitLR; + +/* + * Context for range_gist_consider_split. + */ +typedef struct +{ + TypeCacheEntry *typcache; /* typcache for range type */ + bool has_subtype_diff; /* does it have subtype_diff? */ + int entries_count; /* total number of entries being split */ + + /* Information about currently selected split follows */ + + bool first; /* true if no split was selected yet */ + + RangeBound *left_upper; /* upper bound of left interval */ + RangeBound *right_lower; /* lower bound of right interval */ + + float4 ratio; /* split ratio */ + float4 overlap; /* overlap between left and right predicate */ + int common_left; /* # common entries destined for each side */ + int common_right; +} ConsiderSplitContext; + +/* + * Bounds extracted from a non-empty range, for use in + * range_gist_double_sorting_split. + */ +typedef struct +{ + RangeBound lower; + RangeBound upper; +} NonEmptyRange; + +/* + * Represents information about an entry that can be placed in either group + * without affecting overlap over selected axis ("common entry"). + */ +typedef struct +{ + /* Index of entry in the initial array */ + int index; + /* Delta between closeness of range to each of the two groups */ + double delta; +} CommonEntry; + +/* Helper macros to place an entry in the left or right group during split */ +/* Note direct access to variables v, typcache, left_range, right_range */ +#define PLACE_LEFT(range, off) \ + do { \ + if (v->spl_nleft > 0) \ + left_range = range_super_union(typcache, left_range, range); \ + else \ + left_range = (range); \ + v->spl_left[v->spl_nleft++] = (off); \ + } while(0) + +#define PLACE_RIGHT(range, off) \ + do { \ + if (v->spl_nright > 0) \ + right_range = range_super_union(typcache, right_range, range); \ + else \ + right_range = (range); \ + v->spl_right[v->spl_nright++] = (off); \ + } while(0) + /* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */ #define rangeCopy(r) \ ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \ false, -1))) -/* - * Auxiliary structure for picksplit method. - */ -typedef struct -{ - int index; /* original index in entryvec->vector[] */ - RangeType *data; /* range value to sort */ - TypeCacheEntry *typcache; /* range type's info */ -} PickSplitSortItem; - static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType * r1, RangeType * r2); static bool range_gist_consistent_int(FmgrInfo *flinfo, @@ -57,7 +154,30 @@ static bool range_gist_consistent_int(FmgrInfo *flinfo, static bool range_gist_consistent_leaf(FmgrInfo *flinfo, StrategyNumber strategy, RangeType *key, Datum query); -static int sort_item_cmp(const void *a, const void *b); +static void range_gist_fallback_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v); +static void range_gist_class_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v, + SplitLR *classes_groups); +static void range_gist_single_sorting_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v, + bool use_upper_bound); +static void range_gist_double_sorting_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v); +static void range_gist_consider_split(ConsiderSplitContext *context, + RangeBound *right_lower, int min_left_count, + RangeBound *left_upper, int max_left_count); +static int get_gist_range_class(RangeType *range); +static int single_bound_cmp(const void *a, const void *b, void *arg); +static int interval_cmp_lower(const void *a, const void *b, void *arg); +static int interval_cmp_upper(const void *a, const void *b, void *arg); +static int common_entry_cmp(const void *i1, const void *i2); +static float8 call_subtype_diff(TypeCacheEntry *typcache, + Datum val1, Datum val2); /* GiST query consistency check */ @@ -122,7 +242,16 @@ range_gist_decompress(PG_FUNCTION_ARGS) PG_RETURN_POINTER(entry); } -/* page split penalty function */ +/* + * GiST page split penalty function. + * + * The penalty function has the following goals (in order from most to least + * important): + * - Keep normal ranges separate + * - Avoid broadening the class of the original predicate + * - Avoid broadening (as determined by subtype_diff) the original predicate + * - Favor adding ranges to narrower original predicates + */ Datum range_gist_penalty(PG_FUNCTION_ARGS) { @@ -132,118 +261,253 @@ range_gist_penalty(PG_FUNCTION_ARGS) RangeType *orig = DatumGetRangeType(origentry->key); RangeType *new = DatumGetRangeType(newentry->key); TypeCacheEntry *typcache; - RangeType *s_union; - FmgrInfo *subtype_diff; - RangeBound lower1, - lower2; - RangeBound upper1, - upper2; - bool empty1, - empty2; - float8 lower_diff, - upper_diff; + bool has_subtype_diff; + RangeBound orig_lower, + new_lower, + orig_upper, + new_upper; + bool orig_empty, + new_empty; if (RangeTypeGetOid(orig) != RangeTypeGetOid(new)) elog(ERROR, "range types do not match"); typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig)); - subtype_diff = &typcache->rng_subdiff_finfo; + has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid); + + range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty); + range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty); /* - * If new is or contains empty, and orig doesn't, apply infinite penalty. - * We really don't want to pollute an empty-free subtree with empties. + * Distinct branches for handling distinct classes of ranges. Note + * that penalty values only need to be commensurate within the same + * class of new range. */ - if (RangeIsOrContainsEmpty(new) && !RangeIsOrContainsEmpty(orig)) + if (new_empty) { - *penalty = get_float4_infinity(); - PG_RETURN_POINTER(penalty); + /* Handle insertion of empty range */ + if (orig_empty) + { + /* + * The best case is to insert it to empty original + * range. Insertion here means no broadening of original range. + * Also original range is the most narrow. + */ + *penalty = 0.0; + } + else if (RangeIsOrContainsEmpty(orig)) + { + /* + * The second case is to insert empty range into range which + * contains at least one underlying empty range. There is still + * no broadening of original range, but original range is not as + * narrow as possible. + */ + *penalty = CONTAIN_EMPTY_PENALTY; + } + else if (orig_lower.infinite && orig_upper.infinite) + { + /* + * Original range requires broadening. (-inf; +inf) is most far + * from normal range in this case. + */ + *penalty = 2 * CONTAIN_EMPTY_PENALTY; + } + else if (orig_lower.infinite || orig_upper.infinite) + { + /* + * (-inf, x) or (x, +inf) original ranges are closer to normal + * ranges, so it's worse to mix it with empty ranges. + */ + *penalty = 3 * CONTAIN_EMPTY_PENALTY; + } + else + { + /* + * The least preferred case is broadening of normal range. + */ + *penalty = 4 * CONTAIN_EMPTY_PENALTY; + } } - - /* - * We want to compare the size of "orig" to size of "orig union new". - * The penalty will be the sum of the reduction in the lower bound plus - * the increase in the upper bound. - */ - s_union = range_super_union(typcache, orig, new); - - range_deserialize(typcache, orig, &lower1, &upper1, &empty1); - range_deserialize(typcache, s_union, &lower2, &upper2, &empty2); - - /* handle cases where orig is empty */ - if (empty1 && empty2) + else if (new_lower.infinite && new_upper.infinite) { - *penalty = 0; - PG_RETURN_POINTER(penalty); + /* Handle insertion of (-inf, +inf) range */ + if (orig_lower.infinite && orig_upper.infinite) + { + /* + * Best case is inserting to (-inf, +inf) original range. + */ + *penalty = 0.0; + } + else if (orig_lower.infinite || orig_upper.infinite) + { + /* + * When original range is (-inf, x) or (x, +inf) it requires + * broadening of original range (extension of one bound to + * infinity). + */ + *penalty = INFINITE_BOUND_PENALTY; + } + else + { + /* + * Insertion to normal original range is least preferred. + */ + *penalty = 2 * INFINITE_BOUND_PENALTY; + } + + if (RangeIsOrContainsEmpty(orig)) + { + /* + * Original range is narrower when it doesn't contain empty ranges. + * Add additional penalty otherwise. + */ + *penalty += CONTAIN_EMPTY_PENALTY; + } } - else if (empty1) + else if (new_lower.infinite) { - /* infinite penalty for pushing non-empty into all-empty subtree */ - *penalty = get_float4_infinity(); - PG_RETURN_POINTER(penalty); + /* Handle insertion of (-inf, x) range */ + if (!orig_empty && orig_lower.infinite) + { + if (orig_upper.infinite) + { + /* + * (-inf, +inf) range won't be extended by insertion of + * (-inf, x) range. It's a less desirable case than insertion + * to (-inf, y) original range without extension, because in + * that case original range is narrower. But we can't express + * that in single float value. + */ + *penalty = 0.0; + } + else + { + if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0) + { + /* + * Get extension of original range using subtype_diff. + * Use constant if subtype_diff unavailable. + */ + if (has_subtype_diff) + *penalty = call_subtype_diff(typcache, + new_upper.val, + orig_upper.val); + else + *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY; + } + else + { + /* No extension of original range */ + *penalty = 0.0; + } + } + } + else + { + /* + * If lower bound of original range is not -inf, then extension + * of it is infinity. + */ + *penalty = get_float4_infinity(); + } } - - /* if orig isn't empty, s_union can't be either */ - Assert(!empty2); - - /* similarly, if orig's lower bound is infinite, s_union's must be too */ - Assert(lower2.infinite || !lower1.infinite); - - if (lower2.infinite && lower1.infinite) - lower_diff = 0; - else if (lower2.infinite) - lower_diff = get_float8_infinity(); - else if (OidIsValid(subtype_diff->fn_oid)) + else if (new_upper.infinite) { - lower_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff, - typcache->rng_collation, - lower1.val, - lower2.val)); - /* orig's lower bound must be >= s_union's */ - if (lower_diff < 0) - lower_diff = 0; /* subtype_diff is broken */ + /* Handle insertion of (x, +inf) range */ + if (!orig_empty && orig_upper.infinite) + { + if (orig_lower.infinite) + { + /* + * (-inf, +inf) range won't be extended by insertion of + * (x, +inf) range. It's a less desirable case than insertion + * to (y, +inf) original range without extension, because in + * that case original range is narrower. But we can't express + * that in single float value. + */ + *penalty = 0.0; + } + else + { + if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0) + { + /* + * Get extension of original range using subtype_diff. + * Use constant if subtype_diff unavailable. + */ + if (has_subtype_diff) + *penalty = call_subtype_diff(typcache, + orig_lower.val, + new_lower.val); + else + *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY; + } + else + { + /* No extension of original range */ + *penalty = 0.0; + } + } + } + else + { + /* + * If upper bound of original range is not +inf, then extension + * of it is infinity. + */ + *penalty = get_float4_infinity(); + } } else { - /* only know whether there is a difference or not */ - lower_diff = range_cmp_bounds(typcache, &lower1, &lower2) > 0 ? 1 : 0; + /* Handle insertion of normal (non-empty, non-infinite) range */ + if (orig_empty || orig_lower.infinite || orig_upper.infinite) + { + /* + * Avoid mixing normal ranges with infinite and empty ranges. + */ + *penalty = get_float4_infinity(); + } + else + { + /* + * Calculate extension of original range by calling subtype_diff. + * Use constant if subtype_diff unavailable. + */ + float8 diff = 0.0; + + if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0) + { + if (has_subtype_diff) + diff += call_subtype_diff(typcache, + orig_lower.val, + new_lower.val); + else + diff += DEFAULT_SUBTYPE_DIFF_PENALTY; + } + if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0) + { + if (has_subtype_diff) + diff += call_subtype_diff(typcache, + new_upper.val, + orig_upper.val); + else + diff += DEFAULT_SUBTYPE_DIFF_PENALTY; + } + *penalty = diff; + } } - /* similarly, if orig's upper bound is infinite, s_union's must be too */ - Assert(upper2.infinite || !upper1.infinite); - - if (upper2.infinite && upper1.infinite) - upper_diff = 0; - else if (upper2.infinite) - upper_diff = get_float8_infinity(); - else if (OidIsValid(subtype_diff->fn_oid)) - { - upper_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff, - typcache->rng_collation, - upper2.val, - upper1.val)); - /* orig's upper bound must be <= s_union's */ - if (upper_diff < 0) - upper_diff = 0; /* subtype_diff is broken */ - } - else - { - /* only know whether there is a difference or not */ - upper_diff = range_cmp_bounds(typcache, &upper2, &upper1) > 0 ? 1 : 0; - } - - Assert(lower_diff >= 0 && upper_diff >= 0); - - *penalty = (float) (lower_diff + upper_diff); PG_RETURN_POINTER(penalty); } /* * The GiST PickSplit method for ranges * - * Algorithm based on sorting. Incoming array of ranges is sorted using - * sort_item_cmp function. After that first half of ranges goes to the left - * output, and the second half of ranges goes to the right output. + * Primarily, we try to segregate ranges of different classes. If splitting + * ranges of the same class, use the appropriate split method for that class. */ Datum range_gist_picksplit(PG_FUNCTION_ARGS) @@ -253,73 +517,149 @@ range_gist_picksplit(PG_FUNCTION_ARGS) TypeCacheEntry *typcache; OffsetNumber i; RangeType *pred_left; - RangeType *pred_right; - PickSplitSortItem *sortItems; int nbytes; - OffsetNumber split_idx; - OffsetNumber *left; - OffsetNumber *right; OffsetNumber maxoff; + int count_in_classes[CLS_COUNT]; + int j; + int non_empty_classes_count = 0; + int biggest_class = -1; + int biggest_class_count = 0; + int total_count; /* use first item to look up range type's info */ pred_left = DatumGetRangeType(entryvec->vector[FirstOffsetNumber].key); typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left)); - /* allocate result and work arrays */ maxoff = entryvec->n - 1; nbytes = (maxoff + 1) * sizeof(OffsetNumber); v->spl_left = (OffsetNumber *) palloc(nbytes); v->spl_right = (OffsetNumber *) palloc(nbytes); - sortItems = (PickSplitSortItem *) palloc(maxoff * sizeof(PickSplitSortItem)); /* - * Prepare auxiliary array and sort the values. + * Get count distribution of range classes. */ + memset(count_in_classes, 0, sizeof(count_in_classes)); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { - sortItems[i - 1].index = i; - sortItems[i - 1].data = DatumGetRangeType(entryvec->vector[i].key); - sortItems[i - 1].typcache = typcache; - } - qsort(sortItems, maxoff, sizeof(PickSplitSortItem), sort_item_cmp); + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); - split_idx = maxoff / 2; - - left = v->spl_left; - v->spl_nleft = 0; - right = v->spl_right; - v->spl_nright = 0; - - /* - * First half of items goes to the left output. - */ - pred_left = sortItems[0].data; - *left++ = sortItems[0].index; - v->spl_nleft++; - for (i = 1; i < split_idx; i++) - { - pred_left = range_super_union(typcache, pred_left, sortItems[i].data); - *left++ = sortItems[i].index; - v->spl_nleft++; + count_in_classes[get_gist_range_class(range)]++; } /* - * Second half of items goes to the right output. + * Count non-empty classes and find biggest class. */ - pred_right = sortItems[split_idx].data; - *right++ = sortItems[split_idx].index; - v->spl_nright++; - for (i = split_idx + 1; i < maxoff; i++) + total_count = maxoff; + for (j = 0; j < CLS_COUNT; j++) { - pred_right = range_super_union(typcache, pred_right, sortItems[i].data); - *right++ = sortItems[i].index; - v->spl_nright++; + if (count_in_classes[j] > 0) + { + if (count_in_classes[j] > biggest_class_count) + { + biggest_class_count = count_in_classes[j]; + biggest_class = j; + } + non_empty_classes_count++; + } } - *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */ + Assert(non_empty_classes_count > 0); - v->spl_ldatum = RangeTypeGetDatum(pred_left); - v->spl_rdatum = RangeTypeGetDatum(pred_right); + if (non_empty_classes_count == 1) + { + /* One non-empty class, so split inside class */ + if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL) + { + /* double sorting split for normal ranges */ + range_gist_double_sorting_split(typcache, entryvec, v); + } + else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF) + { + /* upper bound sorting split for (-inf, x) ranges */ + range_gist_single_sorting_split(typcache, entryvec, v, true); + } + else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF) + { + /* lower bound sorting split for (x, +inf) ranges */ + range_gist_single_sorting_split(typcache, entryvec, v, false); + } + else + { + /* trivial split for all (-inf, +inf) or all empty ranges */ + range_gist_fallback_split(typcache, entryvec, v); + } + } + else + { + /* + * Class based split. + * + * To which side of the split should each class go? Initialize them + * all to go to the left side. + */ + SplitLR classes_groups[CLS_COUNT]; + + memset(classes_groups, 0, sizeof(classes_groups)); + + if (count_in_classes[CLS_NORMAL] > 0) + { + /* separate normal ranges if any */ + classes_groups[CLS_NORMAL] = SPLIT_RIGHT; + } + else + { + /*---------- + * Try to split classes in one of two ways: + * 1) containing infinities - not containing infinities + * 2) containing empty - not containing empty + * + * Select the way which balances the ranges between left and right + * the best. If split in these ways is not possible, there are at + * most 3 classes, so just separate biggest class. + *---------- + */ + int infCount, nonInfCount; + int emptyCount, nonEmptyCount; + + nonInfCount = + count_in_classes[CLS_NORMAL] + + count_in_classes[CLS_CONTAIN_EMPTY] + + count_in_classes[CLS_EMPTY]; + infCount = total_count - nonInfCount; + + nonEmptyCount = + count_in_classes[CLS_NORMAL] + + count_in_classes[CLS_LOWER_INF] + + count_in_classes[CLS_UPPER_INF] + + count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF]; + emptyCount = total_count - nonEmptyCount; + + if (infCount > 0 && nonInfCount > 0 && + (Abs(infCount - nonInfCount) <= + Abs(emptyCount - nonEmptyCount))) + { + classes_groups[CLS_NORMAL] = SPLIT_RIGHT; + classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT; + classes_groups[CLS_EMPTY] = SPLIT_RIGHT; + } + else if (emptyCount > 0 && nonEmptyCount > 0) + { + classes_groups[CLS_NORMAL] = SPLIT_RIGHT; + classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT; + classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT; + classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT; + } + else + { + /* + * Either total_count == emptyCount or total_count == infCount. + */ + classes_groups[biggest_class] = SPLIT_RIGHT; + } + } + + range_gist_class_split(typcache, entryvec, v, classes_groups); + } PG_RETURN_POINTER(v); } @@ -611,78 +951,649 @@ range_gist_consistent_leaf(FmgrInfo *flinfo, StrategyNumber strategy, } /* - * Compare function for PickSplitSortItem. This is actually the - * interesting part of the picksplit algorithm. - * - * We want to separate out empty ranges, bounded ranges, and unbounded - * ranges. We assume that "contains" and "overlaps" are the most - * important queries, so empty ranges will rarely match and unbounded - * ranges frequently will. Bounded ranges should be in the middle. - * - * Empty ranges we push all the way to the left, then bounded ranges - * (sorted on lower bound, then upper), then ranges with no lower - * bound, then ranges with no upper bound; and finally, ranges with no - * upper or lower bound all the way to the right. + * Trivial split: half of entries will be placed on one page + * and the other half on the other page. */ -static int -sort_item_cmp(const void *a, const void *b) +static void +range_gist_fallback_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v) { - PickSplitSortItem *i1 = (PickSplitSortItem *) a; - PickSplitSortItem *i2 = (PickSplitSortItem *) b; - RangeType *r1 = i1->data; - RangeType *r2 = i2->data; - TypeCacheEntry *typcache = i1->typcache; - RangeBound lower1, - lower2; - RangeBound upper1, - upper2; - bool empty1, - empty2; - int cmp; + RangeType *left_range = NULL; + RangeType *right_range = NULL; + OffsetNumber i, maxoff, split_idx; - range_deserialize(typcache, r1, &lower1, &upper1, &empty1); - range_deserialize(typcache, r2, &lower2, &upper2, &empty2); + maxoff = entryvec->n - 1; + /* Split entries before this to left page, after to right: */ + split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber; - if (empty1 || empty2) + v->spl_nleft = 0; + v->spl_nright = 0; + for (i = FirstOffsetNumber; i <= maxoff; i++) { - if (empty1 && empty2) - return 0; - else if (empty1) - return -1; - else if (empty2) - return 1; + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); + + if (i < split_idx) + PLACE_LEFT(range, i); else - Assert(false); + PLACE_RIGHT(range, i); + } + + v->spl_ldatum = RangeTypeGetDatum(left_range); + v->spl_rdatum = RangeTypeGetDatum(right_range); +} + +/* + * Split based on classes of ranges. + * + * See get_gist_range_class for class definitions. + * classes_groups is an array of length CLS_COUNT indicating the side of the + * split to which each class should go. + */ +static void +range_gist_class_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v, + SplitLR *classes_groups) +{ + RangeType *left_range = NULL; + RangeType *right_range = NULL; + OffsetNumber i, maxoff; + + maxoff = entryvec->n - 1; + + v->spl_nleft = 0; + v->spl_nright = 0; + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); + int class; + + /* Get class of range */ + class = get_gist_range_class(range); + + /* Place range to appropriate page */ + if (classes_groups[class] == SPLIT_LEFT) + PLACE_LEFT(range, i); + else + { + Assert(classes_groups[class] == SPLIT_RIGHT); + PLACE_RIGHT(range, i); + } + } + + v->spl_ldatum = RangeTypeGetDatum(left_range); + v->spl_rdatum = RangeTypeGetDatum(right_range); +} + +/* + * Sorting based split. First half of entries according to the sort will be + * placed to one page, and second half of entries will be placed to other + * page. use_upper_bound parameter indicates whether to use upper or lower + * bound for sorting. + */ +static void +range_gist_single_sorting_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v, + bool use_upper_bound) +{ + SingleBoundSortItem *sortItems; + RangeType *left_range = NULL; + RangeType *right_range = NULL; + OffsetNumber i, maxoff, split_idx; + + maxoff = entryvec->n - 1; + + sortItems = (SingleBoundSortItem *) + palloc(maxoff * sizeof(SingleBoundSortItem)); + + /* + * Prepare auxiliary array and sort the values. + */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); + RangeBound bound2; + bool empty; + + sortItems[i - 1].index = i; + /* Put appropriate bound into array */ + if (use_upper_bound) + range_deserialize(typcache, range, &bound2, + &sortItems[i - 1].bound, &empty); + else + range_deserialize(typcache, range, &sortItems[i - 1].bound, + &bound2, &empty); + Assert(!empty); + } + + qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem), + single_bound_cmp, typcache); + + split_idx = maxoff / 2; + + v->spl_nleft = 0; + v->spl_nright = 0; + + for (i = 0; i < maxoff; i++) + { + int idx = sortItems[i].index; + RangeType *range = DatumGetRangeType(entryvec->vector[idx].key); + + if (i < split_idx) + PLACE_LEFT(range, idx); + else + PLACE_RIGHT(range, idx); + } + + v->spl_ldatum = RangeTypeGetDatum(left_range); + v->spl_rdatum = RangeTypeGetDatum(right_range); +} + +/* + * Double sorting split algorithm. + * + * The algorithm considers dividing ranges into two groups. The first (left) + * group contains general left bound. The second (right) group contains + * general right bound. The challenge is to find upper bound of left group + * and lower bound of right group so that overlap of groups is minimal and + * ratio of distribution is acceptable. Algorithm finds for each lower bound of + * right group minimal upper bound of left group, and for each upper bound of + * left group maximal lower bound of right group. For each found pair + * range_gist_consider_split considers replacement of currently selected + * split with the new one. + * + * After that, all the entries are divided into three groups: + * 1) Entries which should be placed to the left group + * 2) Entries which should be placed to the right group + * 3) "Common entries" which can be placed to either group without affecting + * amount of overlap. + * + * The common ranges are distributed by difference of distance from lower + * bound of common range to lower bound of right group and distance from upper + * bound of common range to upper bound of left group. + * + * For details see: + * "A new double sorting-based node splitting algorithm for R-tree", + * A. Korotkov + * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36 + */ +static void +range_gist_double_sorting_split(TypeCacheEntry *typcache, + GistEntryVector *entryvec, + GIST_SPLITVEC *v) +{ + ConsiderSplitContext context; + OffsetNumber i, maxoff; + RangeType *range, + *left_range = NULL, + *right_range = NULL; + int common_entries_count; + NonEmptyRange *by_lower, + *by_upper; + CommonEntry *common_entries; + int nentries, i1, i2; + RangeBound *right_lower, *left_upper; + + memset(&context, 0, sizeof(ConsiderSplitContext)); + context.typcache = typcache; + context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid); + + maxoff = entryvec->n - 1; + nentries = context.entries_count = maxoff - FirstOffsetNumber + 1; + context.first = true; + + /* Allocate arrays for sorted range bounds */ + by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange)); + by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange)); + + /* Fill arrays of bounds */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + RangeType *range = DatumGetRangeType(entryvec->vector[i].key); + bool empty; + + range_deserialize(typcache, range, + &by_lower[i - FirstOffsetNumber].lower, + &by_lower[i - FirstOffsetNumber].upper, + &empty); + Assert(!empty); } /* - * If both lower or both upper bounds are infinite, we sort by ascending - * range size. That means that if both upper bounds are infinite, we sort - * by the lower bound _descending_. That creates a slightly odd total - * order, but keeps the pages with very unselective predicates grouped - * more closely together on the right. + * Make two arrays of range bounds: one sorted by lower bound and another + * sorted by upper bound. */ - if (lower1.infinite || upper1.infinite || - lower2.infinite || upper2.infinite) + memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange)); + qsort_arg(by_lower, nentries, sizeof(NonEmptyRange), + interval_cmp_lower, typcache); + qsort_arg(by_upper, nentries, sizeof(NonEmptyRange), + interval_cmp_upper, typcache); + + /*---------- + * The goal is to form a left and right range, so that every entry + * range is contained by either left or right interval (or both). + * + * For example, with the ranges (0,1), (1,3), (2,3), (2,4): + * + * 0 1 2 3 4 + * +-+ + * +---+ + * +-+ + * +---+ + * + * The left and right ranges are of the form (0,a) and (b,4). + * We first consider splits where b is the lower bound of an entry. + * We iterate through all entries, and for each b, calculate the + * smallest possible a. Then we consider splits where a is the + * upper bound of an entry, and for each a, calculate the greatest + * possible b. + * + * In the above example, the first loop would consider splits: + * b=0: (0,1)-(0,4) + * b=1: (0,1)-(1,4) + * b=2: (0,3)-(2,4) + * + * And the second loop: + * a=1: (0,1)-(1,4) + * a=3: (0,3)-(2,4) + * a=4: (0,4)-(2,4) + *---------- + */ + + /* + * Iterate over lower bound of right group, finding smallest possible + * upper bound of left group. + */ + i1 = 0; + i2 = 0; + right_lower = &by_lower[i1].lower; + left_upper = &by_upper[i2].lower; + while (true) { - if (lower1.infinite && lower2.infinite) - return range_cmp_bounds(typcache, &upper1, &upper2); - else if (lower1.infinite) - return -1; - else if (lower2.infinite) - return 1; - else if (upper1.infinite && upper2.infinite) - return -(range_cmp_bounds(typcache, &lower1, &lower2)); - else if (upper1.infinite) - return 1; - else if (upper2.infinite) - return -1; - else - Assert(false); + /* + * Find next lower bound of right group. + */ + while (i1 < nentries && + range_cmp_bounds(typcache, right_lower, + &by_lower[i1].lower) == 0) + { + if (range_cmp_bounds(typcache, &by_lower[i1].upper, + left_upper) > 0) + left_upper = &by_lower[i1].upper; + i1++; + } + if (i1 >= nentries) + break; + right_lower = &by_lower[i1].lower; + + /* + * Find count of ranges which anyway should be placed to the + * left group. + */ + while (i2 < nentries && + range_cmp_bounds(typcache, &by_upper[i2].upper, + left_upper) <= 0) + i2++; + + /* + * Consider found split to see if it's better than what we had. + */ + range_gist_consider_split(&context, right_lower, i1, left_upper, i2); } - if ((cmp = range_cmp_bounds(typcache, &lower1, &lower2)) != 0) - return cmp; + /* + * Iterate over upper bound of left group finding greatest possible + * lower bound of right group. + */ + i1 = nentries - 1; + i2 = nentries - 1; + right_lower = &by_lower[i1].upper; + left_upper = &by_upper[i2].upper; + while (true) + { + /* + * Find next upper bound of left group. + */ + while (i2 >= 0 && + range_cmp_bounds(typcache, left_upper, + &by_upper[i2].upper) == 0) + { + if (range_cmp_bounds(typcache, &by_upper[i2].lower, + right_lower) < 0) + right_lower = &by_upper[i2].lower; + i2--; + } + if (i2 < 0) + break; + left_upper = &by_upper[i2].upper; - return range_cmp_bounds(typcache, &upper1, &upper2); + /* + * Find count of intervals which anyway should be placed to the + * right group. + */ + while (i1 >= 0 && + range_cmp_bounds(typcache, &by_lower[i1].lower, + right_lower) >= 0) + i1--; + + /* + * Consider found split to see if it's better than what we had. + */ + range_gist_consider_split(&context, right_lower, i1 + 1, + left_upper, i2 + 1); + } + + /* + * If we failed to find any acceptable splits, use trivial split. + */ + if (context.first) + { + range_gist_fallback_split(typcache, entryvec, v); + return; + } + + /* + * Ok, we have now selected bounds of the groups. Now we have to distribute + * entries themselves. At first we distribute entries which can be placed + * unambiguously and collect "common entries" to array. + */ + + /* Allocate vectors for results */ + v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); + v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); + v->spl_nleft = 0; + v->spl_nright = 0; + + /* + * Allocate an array for "common entries" - entries which can be placed to + * either group without affecting overlap along selected axis. + */ + common_entries_count = 0; + common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry)); + + /* + * Distribute entries which can be distributed unambiguously, and collect + * common entries. + */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + RangeBound lower, + upper; + bool empty; + + /* + * Get upper and lower bounds along selected axis. + */ + range = DatumGetRangeType(entryvec->vector[i].key); + + range_deserialize(typcache, range, &lower, &upper, &empty); + + if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0) + { + /* Fits in the left group */ + if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0) + { + /* Fits also in the right group, so "common entry" */ + common_entries[common_entries_count].index = i; + if (context.has_subtype_diff) + { + /* + * delta = (lower - context.right_lower) - + * (context.left_upper - upper) + */ + common_entries[common_entries_count].delta = + call_subtype_diff(typcache, + lower.val, + context.right_lower->val) - + call_subtype_diff(typcache, + context.left_upper->val, + upper.val); + } + else + { + /* Without subtype_diff, take all deltas as zero */ + common_entries[common_entries_count].delta = 0; + } + common_entries_count++; + } + else + { + /* Doesn't fit to the right group, so join to the left group */ + PLACE_LEFT(range, i); + } + } + else + { + /* + * Each entry should fit on either left or right group. Since this + * entry didn't fit in the left group, it better fit in the right + * group. + */ + Assert(range_cmp_bounds(typcache, &lower, + context.right_lower) >= 0); + PLACE_RIGHT(range, i); + } + } + + /* + * Distribute "common entries", if any. + */ + if (common_entries_count > 0) + { + /* + * Sort "common entries" by calculated deltas in order to distribute + * the most ambiguous entries first. + */ + qsort(common_entries, common_entries_count, sizeof(CommonEntry), + common_entry_cmp); + + /* + * Distribute "common entries" between groups according to sorting. + */ + for (i = 0; i < common_entries_count; i++) + { + int idx = common_entries[i].index; + + range = DatumGetRangeType(entryvec->vector[idx].key); + + /* + * Check if we have to place this entry in either group to achieve + * LIMIT_RATIO. + */ + if (i < context.common_left) + PLACE_LEFT(range, idx); + else + PLACE_RIGHT(range, idx); + } + } + + v->spl_ldatum = PointerGetDatum(left_range); + v->spl_rdatum = PointerGetDatum(right_range); +} + +/* + * Consider replacement of currently selected split with a better one + * during range_gist_double_sorting_split. + */ +static void +range_gist_consider_split(ConsiderSplitContext *context, + RangeBound *right_lower, int min_left_count, + RangeBound *left_upper, int max_left_count) +{ + int left_count, + right_count; + float4 ratio, + overlap; + + /* + * Calculate entries distribution ratio assuming most uniform distribution + * of common entries. + */ + if (min_left_count >= (context->entries_count + 1) / 2) + left_count = min_left_count; + else if (max_left_count <= context->entries_count / 2) + left_count = max_left_count; + else + left_count = context->entries_count / 2; + right_count = context->entries_count - left_count; + + /* + * Ratio of split: quotient between size of smaller group and total + * entries count. This is necessarily 0.5 or less; if it's less than + * LIMIT_RATIO then we will never accept the new split. + */ + ratio = ((float4) Min(left_count, right_count)) / + ((float4) context->entries_count); + + if (ratio > LIMIT_RATIO) + { + bool selectthis = false; + + /* + * The ratio is acceptable, so compare current split with previously + * selected one. We search for minimal overlap (allowing negative + * values) and minimal ratio secondarily. If subtype_diff is + * available, it's used for overlap measure. Without subtype_diff we + * use number of "common entries" as an overlap measure. + */ + if (context->has_subtype_diff) + overlap = call_subtype_diff(context->typcache, + left_upper->val, + right_lower->val); + else + overlap = max_left_count - min_left_count; + + /* If there is no previous selection, select this split */ + if (context->first) + selectthis = true; + else + { + /* + * Choose the new split if it has a smaller overlap, or same + * overlap but better ratio. + */ + if (overlap < context->overlap || + (overlap == context->overlap && ratio > context->ratio)) + selectthis = true; + } + + if (selectthis) + { + /* save information about selected split */ + context->first = false; + context->ratio = ratio; + context->overlap = overlap; + context->right_lower = right_lower; + context->left_upper = left_upper; + context->common_left = max_left_count - left_count; + context->common_right = left_count - min_left_count; + } + } +} + +/* + * Find class number for range. + * + * The class number is a valid combination of the properties of the + * range. Note: the highest possible number is 8, because CLS_EMPTY + * can't be combined with anything else. + */ +static int +get_gist_range_class(RangeType *range) +{ + int classNumber; + char flags; + + flags = range_get_flags(range); + if (flags & RANGE_EMPTY) + { + classNumber = CLS_EMPTY; + } + else + { + classNumber = 0; + if (flags & RANGE_LB_INF) + classNumber |= CLS_LOWER_INF; + if (flags & RANGE_UB_INF) + classNumber |= CLS_UPPER_INF; + if (flags & RANGE_CONTAIN_EMPTY) + classNumber |= CLS_CONTAIN_EMPTY; + } + return classNumber; +} + +/* + * Comparison function for range_gist_single_sorting_split. + */ +static int +single_bound_cmp(const void *a, const void *b, void *arg) +{ + SingleBoundSortItem *i1 = (SingleBoundSortItem *) a; + SingleBoundSortItem *i2 = (SingleBoundSortItem *) b; + TypeCacheEntry *typcache = (TypeCacheEntry *) arg; + + return range_cmp_bounds(typcache, &i1->bound, &i2->bound); +} + +/* + * Compare NonEmptyRanges by lower bound. + */ +static int +interval_cmp_lower(const void *a, const void *b, void *arg) +{ + NonEmptyRange *i1 = (NonEmptyRange *) a; + NonEmptyRange *i2 = (NonEmptyRange *) b; + TypeCacheEntry *typcache = (TypeCacheEntry *) arg; + + return range_cmp_bounds(typcache, &i1->lower, &i2->lower); +} + +/* + * Compare NonEmptyRanges by upper bound. + */ +static int +interval_cmp_upper(const void *a, const void *b, void *arg) +{ + NonEmptyRange *i1 = (NonEmptyRange *) a; + NonEmptyRange *i2 = (NonEmptyRange *) b; + TypeCacheEntry *typcache = (TypeCacheEntry *) arg; + + return range_cmp_bounds(typcache, &i1->upper, &i2->upper); +} + +/* + * Compare CommonEntrys by their deltas. + */ +static int +common_entry_cmp(const void *i1, const void *i2) +{ + double delta1 = ((CommonEntry *) i1)->delta; + double delta2 = ((CommonEntry *) i2)->delta; + + if (delta1 < delta2) + return -1; + else if (delta1 > delta2) + return 1; + else + return 0; +} + +/* + * Convenience function to invoke type-specific subtype_diff function. + * Caller must have already checked that there is one for the range type. + */ +static float8 +call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2) +{ + float8 value; + + value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo, + typcache->rng_collation, + val1, val2)); + /* Cope with buggy subtype_diff function by returning zero */ + if (value >= 0.0) + return value; + return 0.0; }