phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject git commit: PHOENIX-1313 Investigate why LocalIndexIT.testLocalIndexScanAfterRegionSplit() is failing (Rajeshbabu)
Date Wed, 15 Oct 2014 18:40:23 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 8ece1a74a -> 050643e78


PHOENIX-1313 Investigate why LocalIndexIT.testLocalIndexScanAfterRegionSplit() is failing
(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/050643e7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/050643e7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/050643e7

Branch: refs/heads/master
Commit: 050643e78d6e7cae7cdd0131353d87b0a7bb4507
Parents: 8ece1a7
Author: James Taylor <jtaylor@salesforce.com>
Authored: Wed Oct 15 11:43:47 2014 -0700
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Wed Oct 15 11:45:05 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/LocalIndexIT.java     | 11 ++-
 .../org/apache/phoenix/compile/ScanRanges.java  |  4 ++
 .../coprocessor/BaseScannerRegionObserver.java  | 20 +++---
 .../phoenix/iterate/ParallelIterators.java      | 73 +++++++++++++-------
 4 files changed, 68 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/050643e7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 376590a..ef3dc77 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -59,7 +59,6 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -633,7 +632,6 @@ public class LocalIndexIT extends BaseIndexIT {
     }
 
     @Test
-    @Ignore // TODO: ask Rajeshbabu to take a look
     public void testLocalIndexScanAfterRegionSplit() throws Exception {
         createBaseTable(DATA_TABLE_NAME, null, "('e','j','o')");
         Connection conn1 = DriverManager.getConnection(getUrl());
@@ -690,10 +688,11 @@ public class LocalIndexIT extends BaseIndexIT {
                 query = "SELECT t_id,k1,k3 FROM " + DATA_TABLE_NAME;
                 rs = conn1.createStatement().executeQuery("EXPLAIN "+query);
                 assertEquals(
-                    "CLIENT PARALLEL " + (4+i) + "-WAY RANGE SCAN OVER "
-                            + MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)+" [-32767]\n"+
-                            "CLIENT MERGE SORT",
-                    QueryUtil.getExplainPlan(rs));
+                    "CLIENT PARALLEL "
+                            + ((strings[3 * i].compareTo("j") < 0) ? (4 + i) : (4 + i
- 1))
+                            + "-WAY RANGE SCAN OVER "
+                            + MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME) + " [-32767]\n"
+                            + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
                 rs = conn1.createStatement().executeQuery(query);
                 Thread.sleep(1000);
                 for (int j = 0; j < 26; j++) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/050643e7/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 923bcf3..0ab6368 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -347,6 +347,10 @@ public class ScanRanges {
                 scanStopKey = prefixKey(scanStopKey, scanKeyOffset, prefixBytes, keyOffset);
             }
         }
+        // Don't let the stopRow of the scan go beyond the originalStopKey
+        if (originalStopKey.length > 0 && Bytes.compareTo(scanStopKey, originalStopKey)
> 0) {
+            scanStopKey = originalStopKey;
+        }
         if (scanStopKey.length > 0 && Bytes.compareTo(scanStartKey, scanStopKey)
>= 0) { 
             return null; 
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/050643e7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d65beee..68fa3d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -88,18 +88,22 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver
{
     
     
     private static void throwIfScanOutOfRegion(Scan scan, HRegion region) throws DoNotRetryIOException
{
+        boolean isLocalIndex = ScanUtil.isLocalIndex(scan);
         byte[] lowerInclusiveScanKey = scan.getStartRow();
         byte[] upperExclusiveScanKey = scan.getStopRow();
         byte[] lowerInclusiveRegionKey = region.getStartKey();
         byte[] upperExclusiveRegionKey = region.getEndKey();
-        byte[] expectedUpperRegionKey = scan.getAttribute(EXPECTED_UPPER_REGION_KEY);
-        if (   (expectedUpperRegionKey != null && // local index check
-                  Bytes.compareTo(upperExclusiveRegionKey, expectedUpperRegionKey) != 0)
-            || (expectedUpperRegionKey == null && // non local index check
-                ( Bytes.compareTo(lowerInclusiveScanKey, lowerInclusiveRegionKey) < 0
||
-                ( Bytes.compareTo(upperExclusiveScanKey, upperExclusiveRegionKey) > 0
&& upperExclusiveRegionKey.length != 0) ) ) ) {
-            @SuppressWarnings("deprecation")
-            Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTableName());
+        boolean isStaleRegionBoundaries;
+        if (isLocalIndex) {
+            byte[] expectedUpperRegionKey = scan.getAttribute(EXPECTED_UPPER_REGION_KEY);
+            isStaleRegionBoundaries = expectedUpperRegionKey != null &&
+                    Bytes.compareTo(upperExclusiveRegionKey, expectedUpperRegionKey) != 0;
+        } else {
+            isStaleRegionBoundaries = Bytes.compareTo(lowerInclusiveScanKey, lowerInclusiveRegionKey)
< 0 ||
+                    ( Bytes.compareTo(upperExclusiveScanKey, upperExclusiveRegionKey) >
0 && upperExclusiveRegionKey.length != 0);
+        }
+        if (isStaleRegionBoundaries) {
+            Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTable().getNameAsString());
             throw new DoNotRetryIOException(cause.getMessage(), cause);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/050643e7/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index fc13df5..8c33954 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -44,7 +47,6 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
@@ -65,7 +67,6 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.trace.util.Tracing;
-import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseables;
@@ -180,7 +181,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators
{
         doColumnProjectionOptimization(context, scan, table, statement);
         
         this.iteratorFactory = iteratorFactory;
-        this.scans = getParallelScans(context.getScan());
+        this.scans = getParallelScans();
         List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size()
* ESTIMATED_GUIDEPOSTS_PER_REGION);
         for (List<Scan> scanList : scans) {
             for (Scan aScan : scanList) {
@@ -384,6 +385,11 @@ public class ParallelIterators extends ExplainTable implements ResultIterators
{
         }
         return scans;
     }
+
+    private List<List<Scan>> getParallelScans() throws SQLException {
+        return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+    }
+
     /**
      * Compute the list of parallel scans to run for a given query. The inner scans
      * may be concatenated together directly, while the other ones may need to be
@@ -391,9 +397,11 @@ public class ParallelIterators extends ExplainTable implements ResultIterators
{
      * @return list of parallel scans to run for a given query.
      * @throws SQLException
      */
-    private List<List<Scan>> getParallelScans(final Scan scan) throws SQLException
{
+    private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey)
throws SQLException {
+        Scan scan = context.getScan();
         List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
                 .getAllTableRegions(physicalTableName);
+        
         List<byte[]> regionBoundaries = toBoundaries(regionLocations);
         ScanRanges scanRanges = context.getScanRanges();
         PTable table = getTable();
@@ -404,25 +412,31 @@ public class ParallelIterators extends ExplainTable implements ResultIterators
{
             logger.debug("Guideposts: " + toString(gps));
         }
         boolean traverseAllRegions = isSalted || isLocalIndex;
+        if (!traverseAllRegions) {
+            byte[] scanStartRow = scan.getStartRow();
+            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey)
> 0) {
+                startKey = scanStartRow;
+            }
+            byte[] scanStopRow = scan.getStopRow();
+            if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) {
+                stopKey = scanStopRow;
+            }
+        }
         
-        byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
-        byte[] currentKey = ByteUtil.EMPTY_BYTE_ARRAY;
-        byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
         int regionIndex = 0;
         int stopIndex = regionBoundaries.size();
-        if (!traverseAllRegions) {
-            startKey = scan.getStartRow();
-            if (startKey.length > 0) {
-                currentKey = startKey;
-                regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
-            }
-            stopKey = scan.getStopRow();
-            if (stopKey.length > 0) {
-                stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex,
stopIndex), stopKey));
+        if (startKey.length > 0) {
+            regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+        }
+        if (stopKey.length > 0) {
+            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex,
stopIndex), stopKey));
+            if (isLocalIndex) {
+                stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
             }
         }
         List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex
- regionIndex + 1);
         
+        byte[] currentKey = startKey;
         int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
         int gpsSize = gps.size();
         int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size()
+ 1;
@@ -430,30 +444,31 @@ public class ParallelIterators extends ExplainTable implements ResultIterators
{
         List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
         // Merge bisect with guideposts for all but the last region
         while (regionIndex <= stopIndex) {
-            byte[] currentGuidePost, endRegionKey, endKey;
+            byte[] currentGuidePost, endKey, endRegionKey = EMPTY_BYTE_ARRAY;
             if (regionIndex == stopIndex) {
                 endKey = stopKey;
-                endRegionKey = ByteUtil.EMPTY_BYTE_ARRAY;
             } else {
-                endKey = endRegionKey = regionBoundaries.get(regionIndex);
+                endKey = regionBoundaries.get(regionIndex);
             }
             if (isLocalIndex) {
                 HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo();
-                keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey());
+                endRegionKey = regionInfo.getEndKey();
+                keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
             }
             while (guideIndex < gpsSize
                     && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey)
<= 0 || endKey.length == 0)) {
                 Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost,
keyOffset, false);
-                if (isLocalIndex && newScan != null) {
-                    newScan.setAttribute(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY,
endRegionKey);
-                }
                 scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false);
                 currentKey = currentGuidePost;
                 guideIndex++;
             }
             Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset,
true);
-            if (isLocalIndex && newScan != null) {
-                newScan.setAttribute(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY,
endRegionKey);
+            if (isLocalIndex) {
+                if (newScan != null) {
+                    newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
+                } else if (!scans.isEmpty()) {
+                    scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
+                }
             }
             scans = addNewScan(parallelScans, scans, newScan, endKey, true);
             currentKey = endKey;
@@ -489,6 +504,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators
{
     public List<PeekingResultIterator> getIterators() throws SQLException {
         boolean success = false;
         boolean isReverse = ScanUtil.isReversed(context.getScan());
+        boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
         final ConnectionQueryServices services = context.getConnection().getQueryServices();
         ReadOnlyProps props = services.getProps();
         int numSplits = size();
@@ -517,7 +533,12 @@ public class ParallelIterators extends ExplainTable implements ResultIterators
{
                             }
                             // Resubmit just this portion of work again
                             Scan oldScan = scanPair.getFirst();
-                            List<List<Scan>> newNestedScans = this.getParallelScans(oldScan);
+                            byte[] startKey = oldScan.getStartRow();
+                            byte[] endKey = oldScan.getStopRow();
+                            if (isLocalIndex) {
+                                endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
+                            }
+                            List<List<Scan>> newNestedScans = this.getParallelScans(startKey,
endKey);
                             // Add any concatIterators that were successful so far
                             // as we need these to be in order
                             addConcatResultIterator(iterators, concatIterators);


Mime
View raw message