phoenix-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [phoenix] dbwong commented on a change in pull request #482: PHOENIX-4925 Use Segment tree to organize Guide Post Info
Date Tue, 23 Apr 2019 23:17:29 GMT
dbwong commented on a change in pull request #482: PHOENIX-4925 Use Segment tree to organize
Guide Post Info
URL: https://github.com/apache/phoenix/pull/482#discussion_r277902063
 
 

 ##########
 File path: phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 ##########
 @@ -885,292 +867,215 @@ private static boolean clipKeyRangeBytes(RowKeySchema schema, int
fieldIndex, in
         return maxOffset != offset;
     }
 
+    private List<KeyRange> getRowKeyRanges(List<HRegionLocation> regionLocations,
boolean isLocalIndex) {
+        List<KeyRange> queryKeyRanges = null;
+
+        // Use the dataplan to build the queryRowKeyRanges
+        if (isLocalIndex) {
+            // TODO: when implementing PHOENIX-4585, we should change this to an assert
+            // as we should always have a data plan when a local index is being used.
+            if (dataPlan != null && dataPlan.getTableRef().getTable().getType() !=
PTableType.INDEX) { // Sanity check
+                int columnsInCommon = computeColumnsInCommon();
+                ScanRanges prefixScanRanges = computePrefixScanRanges(dataPlan.getContext().getScanRanges(),
columnsInCommon);
+                List<KeyRange> queryKeyRangesTemp = prefixScanRanges.getRowKeyRanges();
+
+                if (queryKeyRangesTemp.size() == 1 && queryKeyRangesTemp.get(0) ==
KeyRange.EVERYTHING_RANGE) {
+                    queryKeyRanges = queryKeyRangesTemp;
+                } else {
+                    List<KeyRange> newQueryRowKeyRanges = Lists.newArrayListWithExpectedSize(
+                            queryKeyRangesTemp.size() * regionLocations.size());
+
+                    for (HRegionLocation regionLocation : regionLocations) {
+                        HRegionInfo regionInfo = regionLocation.getRegionInfo();
+
+                        // Only attempt further pruning if the prefix range is using
+                        // a skip scan since we've already pruned the range of regions
+                        // based on the start/stop key.
+                        if (columnsInCommon > 0 && prefixScanRanges.useSkipScanFilter())
{
+                            byte[] regionStartKey = regionInfo.getStartKey();
+                            ImmutableBytesWritable ptr = context.getTempPtr();
+                            clipKeyRangeBytes(prefixScanRanges.getSchema(), 0,
+                                    columnsInCommon, regionStartKey, ptr, false);
+                            regionStartKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                            // Prune this region if there's no intersection
+                            if (!prefixScanRanges.intersectRegion(regionStartKey, regionInfo.getEndKey(),
false)) {
+                                continue;
+                            }
+                        }
+
+                        for (KeyRange queryKeyRange : queryKeyRangesTemp) {
+                            KeyRange newQueryRowKeyRange = queryKeyRange.prependRange(
+                                    regionInfo.getStartKey(),0,regionInfo.getStartKey().length);
+                            newQueryRowKeyRanges.add(newQueryRowKeyRange);
+                        }
+                    }
+
+                    queryKeyRanges = newQueryRowKeyRanges;
+                }
+            }
+        }
+
+        if (queryKeyRanges == null) {
+            ScanRanges scanRanges = context.getScanRanges();
+            queryKeyRanges = scanRanges.getRowKeyRanges();
+        }
+
+        return queryKeyRanges;
+    }
+
     /**
      * Compute the list of parallel scans to run for a given query. The inner scans
      * may be concatenated together directly, while the other ones may need to be
-     * merge sorted, depending on the query.
-     * Also computes an estimated bytes scanned, rows scanned, and last update time
-     * of statistics. To compute correctly, we need to handle a couple of edge cases:
-     * 1) if a guidepost is equal to the start key of the scan.
-     * 2) If a guidepost is equal to the end region key.
-     * In both cases, we set a flag (delayAddingEst) which indicates that the previous
-     * gp should be use in our stats calculation. The normal case is that a gp is
-     * encountered which is in the scan range in which case it is simply added to
-     * our calculation.
-     * For the last update time, we use the min timestamp of the gp that are in
-     * range of the scans that will be issued. If we find no gp in the range, we use
-     * the gp in the first or last region of the scan. If we encounter a region with
-     * no gp, then we return a null value as an indication that we don't know with
-     * certainty when the stats were updated last. This handles the case of a split
-     * occurring for a large ingest with stats never having been calculated for the
-     * new region.
+     * merge sorted, depending on the query. Also computes an estimated bytes scanned,
+     * rows scanned, and last update time of statistics.
+     *
      * @return list of parallel scans to run for a given query.
      * @throws SQLException
      */
     private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey)
throws SQLException {
+        ScanRanges scanRanges = context.getScanRanges();
         List<HRegionLocation> regionLocations = getRegionBoundaries(scanGrouper);
         List<byte[]> regionBoundaries = toBoundaries(regionLocations);
-        ScanRanges scanRanges = context.getScanRanges();
+
         PTable table = getTable();
         boolean isSalted = table.getBucketNum() != null;
         boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
-        GuidePostsInfo gps = getGuidePosts();
-        // case when stats wasn't collected
-        hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
-        // Case when stats collection did run but there possibly wasn't enough data. In such
a
-        // case we generate an empty guide post with the byte estimate being set as guide
post
-        // width.
-        boolean emptyGuidePost = gps.isEmptyGuidePost();
-        byte[] startRegionBoundaryKey = startKey;
-        byte[] stopRegionBoundaryKey = stopKey;
-        int columnsInCommon = 0;
-        ScanRanges prefixScanRanges = ScanRanges.EVERYTHING;
-        boolean traverseAllRegions = isSalted || isLocalIndex;
-        if (isLocalIndex) {
-            // TODO: when implementing PHOENIX-4585, we should change this to an assert
-            // as we should always have a data plan when a local index is being used.
-            if (dataPlan != null && dataPlan.getTableRef().getTable().getType() !=
PTableType.INDEX) { // Sanity check
-                prefixScanRanges = computePrefixScanRanges(dataPlan.getContext().getScanRanges(),
columnsInCommon=computeColumnsInCommon());
-                KeyRange prefixRange = prefixScanRanges.getScanRange();
-                if (!prefixRange.lowerUnbound()) {
-                    startRegionBoundaryKey = prefixRange.getLowerRange();
-                }
-                if (!prefixRange.upperUnbound()) {
-                    stopRegionBoundaryKey = prefixRange.getUpperRange();
-                }
+        // We'll never have a case where a table is both salted and local.
+        assert !(isSalted && isLocalIndex);
+
+        Long pageLimit = getUnfilteredPageLimit(scan);
+        boolean estimateUsingStats = ! (scanRanges.isPointLookup() || pageLimit != null);
+
+        GuidePostEstimation estimationFromStats = null;
+        GuidePostsInfo guidePostsInfo = getGuidePosts();
+        boolean hasGuidePosts = guidePostsInfo != GuidePostsInfo.NO_GUIDEPOST;
+        List<Pair<Integer, List<KeyRange>>> parallelScanRangesGroupedByRegion;
+
+        // Get the Query KeyRanges from skip filters
+        List<KeyRange> queryKeyRanges = getRowKeyRanges(regionLocations, isLocalIndex);
+
+        if (this.useStatsForParallelization && hasGuidePosts) {
+            Pair<List<KeyRange>, GuidePostEstimation> result = guidePostsInfo.generateParallelScanRanges(queryKeyRanges);
+
+            parallelScanRangesGroupedByRegion = ScanUtil.splitKeyRangesByBoundaries(regionBoundaries,
result.getFirst());
+
+            if (estimateUsingStats) {
+                estimationFromStats = result.getSecond();
             }
-        } else if (!traverseAllRegions) {
-            byte[] scanStartRow = scan.getStartRow();
-            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey)
> 0) {
-                startRegionBoundaryKey = startKey = scanStartRow;
+        }
+        else {
+            // Get the Query KeyRanges which are grouped by region
+            List<Pair<Integer, List<KeyRange>>> queryKeyRangesGroupedByRegion
=
+                    ScanUtil.splitKeyRangesByBoundaries(regionBoundaries, queryKeyRanges);
+            parallelScanRangesGroupedByRegion = Lists.newArrayListWithCapacity(queryKeyRangesGroupedByRegion.size());
+
+            for (Pair<Integer, List<KeyRange>> pair : queryKeyRangesGroupedByRegion)
{
+                Integer regionIndex = pair.getFirst();
+                List<KeyRange> queryKeyRangesPerRegion = pair.getSecond();
+                assert (queryKeyRangesPerRegion.size() > 0);
+
+                assert (! queryKeyRangesPerRegion.get(queryKeyRangesPerRegion.size() - 1).isUpperInclusive());
+                byte[] endKey = queryKeyRangesPerRegion.get(queryKeyRangesPerRegion.size()
- 1).getUpperRange();
+                KeyRange scanKeyRange = KeyRange.getKeyRange(queryKeyRangesPerRegion.get(0).getLowerRange(),
+                        queryKeyRangesPerRegion.get(0).isLowerInclusive(), endKey, false);
+                parallelScanRangesGroupedByRegion.add(
+                        new Pair<Integer, List<KeyRange>>(regionIndex, Lists.newArrayList(scanKeyRange)));
             }
-            byte[] scanStopRow = scan.getStopRow();
-            if (stopKey.length == 0
-                    || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey)
< 0)) {
-                stopRegionBoundaryKey = stopKey = scanStopRow;
+
+            if (estimateUsingStats && hasGuidePosts) {
+                estimationFromStats = guidePostsInfo.getEstimationOnly(queryKeyRanges);
             }
         }
 
-        int regionIndex = 0;
-        int startRegionIndex = 0;
-        int stopIndex = regionBoundaries.size();
-        if (startRegionBoundaryKey.length > 0) {
-            startRegionIndex = regionIndex = getIndexContainingInclusive(regionBoundaries,
startRegionBoundaryKey);
-        }
-        if (stopRegionBoundaryKey.length > 0) {
-            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex,
stopIndex), stopRegionBoundaryKey));
+        // Generate parallel scan for each region
+        int countOfRegionsToScan = parallelScanRangesGroupedByRegion.size();
+        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(countOfRegionsToScan);
+        List<Scan> regionScans = Lists.newArrayListWithExpectedSize(1);
+
+        for (int i = 0; i < countOfRegionsToScan; i++) {
+            Integer regionIndex = parallelScanRangesGroupedByRegion.get(i).getFirst();
+            List<KeyRange> regionScanKeyRanges = parallelScanRangesGroupedByRegion.get(i).getSecond();
+            int keyRangeCount = regionScanKeyRanges.size();
+            assert (keyRangeCount > 0);
+
+            HRegionLocation regionLocation = regionLocations.get(regionIndex);
+            HRegionInfo regionInfo = regionLocation.getRegionInfo();
+
+            int keyOffset = 0;
             if (isLocalIndex) {
-                stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
+                keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey());
             }
-        }
-        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex
- regionIndex + 1);
 
-        ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
-
-        int gpsSize = gps.getGuidePostsCount();
-        int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size()
+ 1;
-        int keyOffset = 0;
-        ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
-        List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
-        ImmutableBytesWritable guidePosts = gps.getGuidePosts();
-        ByteArrayInputStream stream = null;
-        DataInput input = null;
-        PrefixByteDecoder decoder = null;
-        int guideIndex = 0;
-        GuidePostEstimate estimates = new GuidePostEstimate();
-        boolean gpsForFirstRegion = false;
-        boolean intersectWithGuidePosts = true;
-        // Maintain min ts for gps in first or last region outside of
-        // gps that are in the scan range. We'll use this if we find
-        // no gps in range.
-        long fallbackTs = Long.MAX_VALUE;
-        // Determination of whether of not we found a guidepost in
-        // every region between the start and stop key. If not, then
-        // we cannot definitively say at what time the guideposts
-        // were collected.
-        boolean gpsAvailableForAllRegions = true;
-        try {
-            boolean delayAddingEst = false;
-            ImmutableBytesWritable firstRegionStartKey = null;
-            if (gpsSize > 0) {
-                stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(),
guidePosts.getLength());
-                input = new DataInputStream(stream);
-                decoder = new PrefixByteDecoder(gps.getMaxLength());
-                firstRegionStartKey = new ImmutableBytesWritable(regionLocations.get(regionIndex).getRegionInfo().getStartKey());
-                try {
-                    int c;
-                    // Continue walking guideposts until we get past the currentKey
-                    while ((c=currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder,
input))) >= 0) {
-                        // Detect if we found a guidepost that might be in the first region.
This
-                        // is for the case where the start key may be past the only guidepost
in
-                        // the first region.
-                        if (!gpsForFirstRegion && firstRegionStartKey.compareTo(currentGuidePost)
<= 0) {
-                            gpsForFirstRegion = true;
-                        }
-                        // While we have gps in the region (but outside of start/stop key),
track
-                        // the min ts as a fallback for the time at which stas were calculated.
-                        if (gpsForFirstRegion) {
-                            fallbackTs =
-                                    Math.min(fallbackTs,
-                                        gps.getGuidePostTimestamps()[guideIndex]);
-                        }
-                        // Special case for gp == startKey in which case we want to
-                        // count this gp (if it's in range) though we go past it.
-                        delayAddingEst = (c == 0);
-                        guideIndex++;
-                    }
-                } catch (EOFException e) {
-                    // expected. Thrown when we have decoded all guide posts.
-                    intersectWithGuidePosts = false;
-                }
-            }
-            byte[] endRegionKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
-            byte[] currentKeyBytes = currentKey.copyBytes();
-            intersectWithGuidePosts &= guideIndex < gpsSize;
-            // Merge bisect with guideposts for all but the last region
-            while (regionIndex <= stopIndex) {
-                HRegionLocation regionLocation = regionLocations.get(regionIndex);
-                HRegionInfo regionInfo = regionLocation.getRegionInfo();
-                byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
-                byte[] endKey;
-                if (regionIndex == stopIndex) {
-                    endKey = stopKey;
-                } else {
-                    endKey = regionBoundaries.get(regionIndex);
-                }
-                if (isLocalIndex) {
-                    // Only attempt further pruning if the prefix range is using
-                    // a skip scan since we've already pruned the range of regions
-                    // based on the start/stop key.
-                    if (columnsInCommon > 0 && prefixScanRanges.useSkipScanFilter())
{
-                        byte[] regionStartKey = regionInfo.getStartKey();
-                        ImmutableBytesWritable ptr = context.getTempPtr();
-                        clipKeyRangeBytes(prefixScanRanges.getSchema(), 0, columnsInCommon,
regionStartKey, ptr, false);
-                        regionStartKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                        // Prune this region if there's no intersection
-                        if (!prefixScanRanges.intersectRegion(regionStartKey, regionInfo.getEndKey(),
false)) {
-                            currentKeyBytes = endKey;
-                            regionIndex++;
-                            continue;
-                        }
-                    }
-                    keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey());
-                }
-                byte[] initialKeyBytes = currentKeyBytes;
-                int gpsComparedToEndKey = -1;
-                boolean everNotDelayed = false;
-                while (intersectWithGuidePosts && (endKey.length == 0 || (gpsComparedToEndKey=currentGuidePost.compareTo(endKey))
<= 0)) {
-                    Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes,
keyOffset,
-                        false);
-                    if (newScan != null) {
-                        ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
-                            regionInfo.getStartKey(), regionInfo.getEndKey(),
-                            newScan.getStartRow(), newScan.getStopRow());
-                        // If we've delaying adding estimates, add the previous
-                        // gp estimates now that we know they are in range.
-                        if (delayAddingEst) {
-                            updateEstimates(gps, guideIndex-1, estimates);
-                        }
-                        // If we're not delaying adding estimates, add the
-                        // current gp estimates.
-                        if (! (delayAddingEst = gpsComparedToEndKey == 0) ) {
-                            updateEstimates(gps, guideIndex, estimates);
-                        }
-                    } else {
-                        delayAddingEst = false;
-                    }
-                    everNotDelayed |= !delayAddingEst;
-                    scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes,
false, regionLocation);
-                    currentKeyBytes = currentGuidePostBytes;
-                    try {
-                        currentGuidePost = PrefixByteCodec.decode(decoder, input);
-                        currentGuidePostBytes = currentGuidePost.copyBytes();
-                        guideIndex++;
-                    } catch (EOFException e) {
-                        // We have read all guide posts
-                        intersectWithGuidePosts = false;
-                    }
-                }
-                boolean gpsInThisRegion = initialKeyBytes != currentKeyBytes;
-                if (!useStatsForParallelization) {
-                    /*
-                     * If we are not using stats for generating parallel scans, we need to
reset the
-                     * currentKey back to what it was at the beginning of the loop.
-                     */
-                    currentKeyBytes = initialKeyBytes;
+            // Make a scan for every range in regionScanKeyRanges
+            for (int j = 0; j < keyRangeCount; j++) {
+                KeyRange keyRange = regionScanKeyRanges.get(j);
+                boolean crossesRegionBoundary = false;
+
+                if (j == keyRangeCount - 1) {
+                    crossesRegionBoundary = true;
 
 Review comment:
   My understanding is crossesRegionBoundary was only needed for LocalIndexes or Salted Tables.
 Maybe we should rearrange to indicate?  It is also used in addNewScan to force add the scan
without useStatsForParallelization, but again I believe that should only be true for local
indexes or salted tables.  Ideally maybe we remove the logic there in addNewScan?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message