phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajeshb...@apache.org
Subject [1/3] phoenix git commit: PHOENIX-2628 Ensure split when iterating through results handled correctly(Rajeshbabu)
Date Wed, 04 May 2016 13:20:58 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 99713a61c -> d700c1f03


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 696f051..f9e9913 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -20,6 +20,9 @@ package org.apache.phoenix.util;
 import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
 import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -32,6 +35,7 @@ import java.util.NavigableSet;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -103,6 +107,10 @@ public class ScanUtil {
         return scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX) != null;
     }
 
+    public static boolean isNonAggregateScan(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
+    }
+
     // Use getTenantId and pass in column name to match against
     // in as PSchema attribute. If column name matches in 
     // KeyExpressions, set on scan as attribute
@@ -618,6 +626,62 @@ public class ScanUtil {
         }
     }
 
+    /**
+     * prefix region start key to the start row/stop row suffix and set as scan boundaries.
+     * @param scan
+     * @param lowerInclusiveRegionKey
+     * @param upperExclusiveRegionKey
+     */
+    public static void setupLocalIndexScan(Scan scan, byte[] lowerInclusiveRegionKey,
+            byte[] upperExclusiveRegionKey) {
+        byte[] prefix = lowerInclusiveRegionKey.length == 0 ? new byte[upperExclusiveRegionKey.length]:
lowerInclusiveRegionKey;
+        int prefixLength = lowerInclusiveRegionKey.length == 0? upperExclusiveRegionKey.length:
lowerInclusiveRegionKey.length;
+        if(scan.getAttribute(SCAN_START_ROW_SUFFIX)!=null) {
+            scan.setStartRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_START_ROW_SUFFIX),
0, prefix, prefixLength));
+        }
+        if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX)!=null) {
+            scan.setStopRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_STOP_ROW_SUFFIX),
0, prefix, prefixLength));
+        }
+    }
+
+    public static byte[] getActualStartRow(Scan localIndexScan, HRegionInfo regionInfo) {
+        return localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX) == null ? localIndexScan
+                .getStartRow() : ScanRanges.prefixKey(localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX),
0 ,
+            regionInfo.getStartKey().length == 0 ? new byte[regionInfo.getEndKey().length]
+                    : regionInfo.getStartKey(),
+            regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo
+                    .getStartKey().length);
+    }
+
+    /**
+     * Set all attributes required and boundaries for local index scan.
+     * @param keyOffset
+     * @param regionStartKey
+     * @param regionEndKey
+     * @param newScan
+     */
+    public static void setLocalIndexAttributes(Scan newScan, int keyOffset, byte[] regionStartKey,
byte[] regionEndKey, byte[] startRowSuffix, byte[] stopRowSuffix) {
+        if(ScanUtil.isLocalIndex(newScan)) {
+             newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionStartKey);
+             newScan.setStartRow(regionStartKey);
+             newScan.setStopRow(regionEndKey);
+             if (keyOffset > 0 ) {
+                 newScan.setAttribute(SCAN_START_ROW_SUFFIX, ScanRanges.stripPrefix(startRowSuffix,
keyOffset));
+             } else {
+                 newScan.setAttribute(SCAN_START_ROW_SUFFIX, startRowSuffix);
+             }
+             if (keyOffset > 0) {
+                 newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, ScanRanges.stripPrefix(stopRowSuffix,
keyOffset));
+             } else {
+                 newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, stopRowSuffix);
+             }
+         }
+    }
+
+    public static boolean isConextScan(Scan scan, StatementContext context) {
+        return Bytes.compareTo(context.getScan().getStartRow(), scan.getStartRow()) == 0
&& Bytes
+                .compareTo(context.getScan().getStopRow(), scan.getStopRow()) == 0;
+    }
     public static int getRowKeyOffset(byte[] regionStartKey, byte[] regionEndKey) {
         return regionStartKey.length > 0 ? regionStartKey.length : regionEndKey.length;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index d61e9fe..c021b2c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -752,7 +752,7 @@ public abstract class BaseTest {
         conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, IndexLoadBalancer.class,
             LoadBalancer.class);
         conf.setClass("hbase.coprocessor.regionserver.classes", LocalIndexMerger.class,
-            RegionServerObserver.class);
+            RegionServerObserver.class) ;
         conf.setInt("dfs.namenode.handler.count", 2);
         conf.setInt("dfs.namenode.service.handler.count", 2);
         conf.setInt("dfs.datanode.handler.count", 2);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 47101b2..1da68ba 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -386,7 +386,12 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest
{
             public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException
{
                 return ResultIterator.EMPTY_ITERATOR;
             }
-            
+
+            @Override
+            public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws
SQLException {
+                return ResultIterator.EMPTY_ITERATOR;
+            }
+
             @Override
             public ResultIterator iterator() throws SQLException {
                 return ResultIterator.EMPTY_ITERATOR;
@@ -467,7 +472,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest
{
                 return false;
             }
             
-        }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+        }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()),
context.getScan());
         List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
index f245840..5cdf234 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.hive.PhoenixRowKey;
 import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -115,8 +116,8 @@ public class PhoenixRecordReader<T extends DBWritable> implements
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes
                         .toBytes(true));
                 final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan
-                        .getContext().getConnection().getMutationState(), queryPlan.getTableRef(),
scan,
-                        readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold);
+                        .getContext().getConnection().getMutationState(), scan,
+                        readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold,
queryPlan, MapReduceParallelScanGrouper.getInstance()	);
 
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap
                         (tableResultIterator);


Mime
View raw message