phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/5] PHOENIX-1251 Salted queries with range scan become full table scans
Date Sat, 04 Oct 2014 00:28:23 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 4da593f..59fb082 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,12 +17,9 @@
  */
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
-
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -36,37 +33,39 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseables;
@@ -77,6 +76,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 
@@ -90,7 +90,10 @@ import com.google.common.collect.Lists;
  */
 public class ParallelIterators extends ExplainTable implements ResultIterators {
 	private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
+    private final List<List<Scan>> scans;
     private final List<KeyRange> splits;
+    private final PTable physicalTable;
+    private final QueryPlan plan;
     private final ParallelIteratorFactory iteratorFactory;
     
     public static interface ParallelIteratorFactory {
@@ -98,6 +101,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
     }
 
     private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
+    private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
 
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
         @Override
@@ -106,10 +110,14 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         }
     };
 
-    public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement,
-            RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
             throws SQLException {
-        super(context, tableRef, groupBy);
+        super(plan.getContext(), plan.getTableRef(), plan.getGroupBy());
+        this.plan = plan;
+        StatementContext context = plan.getContext();
+        TableRef tableRef = plan.getTableRef();
+        FilterableStatement statement = plan.getStatement();
+        RowProjector projector = plan.getProjector();
         MetaDataClient client = new MetaDataClient(context.getConnection());
         PTable physicalTable = tableRef.getTable();
         String physicalName = tableRef.getTable().getPhysicalName().getString();
@@ -128,8 +136,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
                         .getTable(new PTableKey(null, physicalTableName));
             }
         }
-        this.splits = getSplits(context, physicalTable, statement.getHint());
-        this.iteratorFactory = iteratorFactory;
+        this.physicalTable = physicalTable;
         Scan scan = context.getScan();
         PTable table = tableRef.getTable();
         if (projector.isProjectEmptyKeyValue()) {
@@ -154,17 +161,30 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
                 }
             }
         } else if (table.getViewType() == ViewType.MAPPED) {
-                // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
-                // selected column values are returned back to client
-                for (PColumnFamily family : table.getColumnFamilies()) {
-                    scan.addFamily(family.getName().getBytes());
-                }
-        } // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
-        if (limit != null) {
-            ScanUtil.andFilterAtEnd(scan, new PageFilter(limit));
+            // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
+            // selected column values are returned back to client
+            for (PColumnFamily family : table.getColumnFamilies()) {
+                scan.addFamily(family.getName().getBytes());
+            }
+        }
+        
+        // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
+        if (perScanLimit != null) {
+            ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
         }
 
         doColumnProjectionOptimization(context, scan, table, statement);
+        
+        this.iteratorFactory = iteratorFactory;
+        this.scans = getParallelScans(context.getScan());
+        List<List<Scan>> scans = getParallelScans(context.getScan());
+        List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
+        for (List<Scan> scanList : scans) {
+            for (Scan aScan : scanList) {
+                splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
+            }
+        }
+        this.splits = ImmutableList.copyOf(splitRanges);
     }
 
     private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
@@ -247,29 +267,218 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         }
     }
 
+    public List<KeyRange> getSplits() {
+        return splits;
+    }
+
+    private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
+        int nBoundaries = regionLocations.size() - 1;
+        List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
+        for (int i = 0; i < nBoundaries; i++) {
+            HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
+            ranges.add(regionInfo.getEndKey());
+        }
+        return ranges;
+    }
+    
+    private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index+1, as the inclusiveKey will be contained
+        // in the next region (since we're matching on the end boundary).
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
+        return guideIndex;
+    }
+    
+    private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index we found as the exclusiveKey won't be
+        // contained in the next region as with getIndexContainingInclusive.
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
+        return guideIndex;
+    }
+    
+    private List<byte[]> getGuidePosts(PTable table) {
+        Scan scan = context.getScan();
+        boolean isPointLookup = context.getScanRanges().isPointLookup();
+        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
+        List<byte[]> gps = Collections.emptyList();
+        /*
+         *  Don't use guide posts if:
+         *  1) We're doing a point lookup, as HBase is fast enough at those
+         *     to not need them to be further parallelized. TODO: pref test to verify
+         *  2) We're collecting stats, as in this case we need to scan entire
+         *     regions worth of data to track where to put the guide posts.
+         */
+        if (!isPointLookup && !ScanUtil.isAnalyzeTable(scan)) {
+            if (table.getColumnFamilies().isEmpty()) {
+                // For sure we can get the defaultCF from the table
+                return table.getGuidePosts();
+            }
+            try {
+                if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) {
+                    // If default CF is not used in scan, use first CF referenced in scan
+                    return table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
+                }
+                // Otherwise, favor use of default CF.
+                return table.getColumnFamily(defaultCF).getGuidePosts();
+            } catch (ColumnFamilyNotFoundException cfne) {
+                // Alter table does this
+            }
+        }
+        return gps;
+        
+    }
+    
+    private static String toString(List<byte[]> gps) {
+        StringBuilder buf = new StringBuilder(gps.size() * 100);
+        buf.append("[");
+        for (int i = 0; i < gps.size(); i++) {
+            buf.append(Bytes.toStringBinary(gps.get(i)));
+            buf.append(",");
+            if (i < gps.size()-1 && (i % 10) == 0) {
+                buf.append("\n");
+            }
+        }
+        buf.setCharAt(buf.length()-1, ']');
+        return buf.toString();
+    }
+    
+    private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, boolean crossedRegionBoundary) {
+        if (scan == null) {
+            return scans;
+        }
+        if (!scans.isEmpty()) {
+            boolean startNewScanList = false;
+            if (!plan.isRowKeyOrdered()) {
+                startNewScanList = true;
+            } else if (crossedRegionBoundary) {
+                if (physicalTable.getIndexType() == IndexType.LOCAL) {
+                    startNewScanList = true;
+                } else if (physicalTable.getBucketNum() != null) {
+                    byte[] previousStartKey = scans.get(scans.size()-1).getStartRow();
+                    byte[] currentStartKey = scan.getStartRow();
+                    byte[] prefix = ScanUtil.getPrefix(previousStartKey, SaltingUtil.NUM_SALTING_BYTES);
+                    startNewScanList = ScanUtil.crossesPrefixBoundary(currentStartKey, prefix, SaltingUtil.NUM_SALTING_BYTES);
+                }
+            }
+            if (startNewScanList) {
+                parallelScans.add(scans);
+                scans = Lists.newArrayListWithExpectedSize(1);
+            }
+        }
+        scans.add(scan);
+        return scans;
+    }
     /**
-     * Splits the given scan's key range so that each split can be queried in parallel
-     * @param hintNode TODO
-     *
-     * @return the key ranges that should be scanned in parallel
+     * Compute the list of parallel scans to run for a given query. The inner scans
+     * may be concatenated together directly, while the other ones may need to be
+     * merge sorted, depending on the query.
+     * @return list of parallel scans to run for a given query.
+     * @throws SQLException
      */
-    // exposed for tests
-    public static List<KeyRange> getSplits(StatementContext context, PTable table, HintNode hintNode) throws SQLException {
-        return ParallelIteratorRegionSplitterFactory.getSplitter(context, table, hintNode).getSplits();
+    private List<List<Scan>> getParallelScans(final Scan scan) throws SQLException {
+        List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+                .getAllTableRegions(physicalTable.getPhysicalName().getBytes());
+        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+        ScanRanges scanRanges = context.getScanRanges();
+        boolean isSalted = physicalTable.getBucketNum() != null;
+        boolean isLocalIndex = physicalTable.getIndexType() == IndexType.LOCAL;
+        List<byte[]> gps = getGuidePosts(physicalTable);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Guideposts: " + toString(gps));
+        }
+        boolean traverseAllRegions = isSalted || isLocalIndex;
+        
+        byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        byte[] currentKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        int regionIndex = 0;
+        int stopIndex = regionBoundaries.size();
+        if (!traverseAllRegions) {
+            startKey = scan.getStartRow();
+            if (startKey.length > 0) {
+                currentKey = startKey;
+                regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+            }
+            stopKey = scan.getStopRow();
+            if (stopKey.length > 0) {
+                stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+            }
+        }
+        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+        
+        int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
+        int gpsSize = gps.size();
+        int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
+        int keyOffset = 0;
+        List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
+        // Merge bisect with guideposts for all but the last region
+        while (regionIndex <= stopIndex) {
+            byte[] currentGuidePost;
+            byte[] endKey = regionIndex == stopIndex ? stopKey : regionBoundaries.get(regionIndex);
+            if (isLocalIndex) {
+                HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo();
+                keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey());
+            }
+            while (guideIndex < gpsSize
+                    && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
+                Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset);
+                scans = addNewScan(parallelScans, scans, newScan, false);
+                currentKey = currentGuidePost;
+                guideIndex++;
+            }
+            Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset);
+            scans = addNewScan(parallelScans, scans, newScan, true);
+            currentKey = endKey;
+            regionIndex++;
+        }
+        if (!scans.isEmpty()) { // Add any remaining scans
+            parallelScans.add(scans);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans,
+                    ScanUtil.getCustomAnnotations(scan)));
+        }
+        return parallelScans;
     }
 
-    private static List<KeyRange> toKeyRanges(List<HRegionLocation> regions) {
-        List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(regions.size());
-        for (HRegionLocation region : regions) {
-            keyRanges.add(TO_KEY_RANGE.apply(region));
+    private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
+        if (!concatIterators.isEmpty()) {
+            if (concatIterators.size() == 1) {
+                iterators.add(concatIterators.get(0));
+            } else {
+                // TODO: should ConcatResultIterator have a constructor that takes
+                // a List<PeekingResultIterator>?
+                iterators.add(new ConcatResultIterator(new ResultIterators() {
+    
+                    @Override
+                    public List<PeekingResultIterator> getIterators() throws SQLException {
+                        return concatIterators;
+                    }
+    
+                    @Override
+                    public int size() {
+                        return concatIterators.size();
+                    }
+    
+                    @Override
+                    public void explain(List<String> planSteps) {
+                        // TODO: review what we should for explain plan here
+                        concatIterators.get(0).explain(planSteps);
+                    }
+                    
+                }));
+            }
         }
-        return keyRanges;
     }
     
-    public List<KeyRange> getSplits() {
-        return splits;
+    public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
+        if (!reverse) {
+            return list;
+        }
+        return Lists.reverse(list);
     }
-
+    
     /**
      * Executes the scan in parallel across all regions, blocking until all scans are complete.
      * @return the result iterators for the scan of each region
@@ -277,53 +486,54 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
     @Override
     public List<PeekingResultIterator> getIterators() throws SQLException {
         boolean success = false;
+        boolean isReverse = ScanUtil.isReversed(context.getScan());
         final ConnectionQueryServices services = context.getConnection().getQueryServices();
         ReadOnlyProps props = services.getProps();
-        int numSplits = splits.size();
+        int numSplits = size();
         List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
-        List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits);
+        List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+        // TODO: what purpose does this scanID serve?
         final UUID scanId = UUID.randomUUID();
         try {
-            submitWork(scanId, splits, futures);
+            submitWork(scanId, scans, futures, splits.size());
             int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
-            final int factor = ScanUtil.isReversed(this.context.getScan()) ? -1 : 1;
-            // Sort futures by row key so that we have a predictable order we're getting rows back for scans.
-            // We're going to wait here until they're finished anyway and this makes testing much easier.
-            Collections.sort(futures, new Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() {
-                @Override
-                public int compare(Pair<KeyRange, Future<PeekingResultIterator>> o1, Pair<KeyRange, Future<PeekingResultIterator>> o2) {
-                    return factor * Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange());
-                }
-            });
             boolean clearedCache = false;
-            byte[] tableName = tableRef.getTable().getPhysicalName().getBytes();
-            for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
-                try {
-                    PeekingResultIterator iterator = future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                    iterators.add(iterator);
-                } catch (ExecutionException e) {
-                    try { // Rethrow as SQLException
-                        throw ServerUtil.parseServerException(e);
-                    } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
-                        List<Pair<KeyRange,Future<PeekingResultIterator>>> newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2);
-                        if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
-                            services.clearTableRegionCache(tableName);
-                            clearedCache = true;
-                        }
-                        List<KeyRange> allSplits = toKeyRanges(services.getAllTableRegions(tableName));
-                        // Intersect what was the expected boundary with all new region boundaries and
-                        // resubmit just this portion of work again
-                        List<KeyRange> newSubSplits = KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits);
-                        submitWork(scanId, newSubSplits, newFutures);
-                        for (Pair<KeyRange,Future<PeekingResultIterator>> newFuture : newFutures) {
-                            // Immediate do a get (not catching exception again) and then add the iterators we
-                            // get back immediately. They'll be sorted as expected, since they're replacing the
-                            // original one.
-                            PeekingResultIterator iterator = newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                            iterators.add(iterator);
+            for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
+                List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
+                for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
+                    try {
+                        PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                        concatIterators.add(iterator);
+                    } catch (ExecutionException e) {
+                        try { // Rethrow as SQLException
+                            throw ServerUtil.parseServerException(e);
+                        } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+                            List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
+                            if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
+                                services.clearTableRegionCache(physicalTable.getName().getBytes());
+                                clearedCache = true;
+                            }
+                            // Resubmit just this portion of work again
+                            Scan oldScan = scanPair.getFirst();
+                            List<List<Scan>> newNestedScans = this.getParallelScans(oldScan);
+                            // Add any concatIterators that were successful so far
+                            // as we need these to be in order
+                            addConcatResultIterator(iterators, concatIterators);
+                            concatIterators = Collections.emptyList();
+                            submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
+                            for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
+                                for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
+                                    // Immediate do a get (not catching exception again) and then add the iterators we
+                                    // get back immediately. They'll be sorted as expected, since they're replacing the
+                                    // original one.
+                                    PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                                    iterators.add(iterator);
+                                }
+                            }
                         }
                     }
                 }
+                addConcatResultIterator(iterators, concatIterators);
             }
 
             success = true;
@@ -343,70 +553,80 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         }
     }
     
-    private void submitWork(final UUID scanId, List<KeyRange> splits,
-            List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) {
+    private static final class ScanLocation {
+    	private final int outerListIndex;
+    	private final int innerListIndex;
+    	private final Scan scan;
+    	public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) {
+    		this.outerListIndex = outerListIndex;
+    		this.innerListIndex = innerListIndex;
+    		this.scan = scan;
+    	}
+    	public int getOuterListIndex() {
+    		return outerListIndex;
+    	}
+    	public int getInnerListIndex() {
+    		return innerListIndex;
+    	}
+    	public Scan getScan() {
+    		return scan;
+    	}
+    }
+    private void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+            List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) {
         final ConnectionQueryServices services = context.getConnection().getQueryServices();
         ExecutorService executor = services.getExecutor();
-        final boolean localIndex = this.tableRef.getTable().getType() == PTableType.INDEX && this.tableRef.getTable().getIndexType() == IndexType.LOCAL;
-        for (final KeyRange split : splits) {
-            final Scan splitScan = ScanUtil.newScan(context.getScan());
-            // Intersect with existing start/stop key if the table is salted
-            // If not salted, we've already intersected it. If salted, we need
-            // to wait until now to intersect, as we're running parallel scans
-            // on all the possible regions here.
-            if (tableRef.getTable().getBucketNum() != null) {
-                KeyRange minMaxRange = context.getMinMaxRange();
-                if (minMaxRange != null) {
-                    // Add salt byte based on current split, as minMaxRange won't have it
-                    minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
-                    // FIXME: seems like this should be possible when we set the scan start/stop
-                    // in StatementContext.setScanRanges(). If it doesn't intersect the range for
-                    // one salt byte, I don't see how it could intersect it with any of them.
-                    if (!ScanUtil.intersectScanRange(splitScan, minMaxRange.getLowerRange(), minMaxRange.getUpperRange())) {
-                        continue; // Skip this chunk if no intersection based on minMaxRange
-                    }
-                }
-            } else if (localIndex) {
-                // Used to detect stale region boundary information on server side
-                splitScan.setAttribute(EXPECTED_UPPER_REGION_KEY, split.getUpperRange());
-                if (splitScan.getStartRow().length != 0 || splitScan.getStopRow().length != 0) {
-                    SaltingUtil.addRegionStartKeyToScanStartAndStopRows(split.getLowerRange(),split.getUpperRange(),
-                        splitScan);
-                }
-            } 
-            if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
-                Future<PeekingResultIterator> future =
-                    executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
+        // Pre-populate nestedFutures lists so that we can shuffle the scans
+        // and add the future to the right nested list. By shuffling the scans
+        // we get better utilization of the cluster since our thread executor
+        // will spray the scans across machines as opposed to targeting a
+        // single one since the scans are in row key order.
+        List<ScanLocation> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
+        for (int i = 0; i < nestedScans.size(); i++) {
+            List<Scan> scans = nestedScans.get(i);
+            List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size());
+            nestedFutures.add(futures);
+            for (int j = 0; j < scans.size(); j++) {
+            	Scan scan = nestedScans.get(i).get(j);
+                scanLocations.add(new ScanLocation(scan, i, j));
+                futures.add(null); // placeholder
+            }
+        }
+        Collections.shuffle(scanLocations);
+        for (ScanLocation scanLocation : scanLocations) {
+            final Scan scan = scanLocation.getScan();
+            Future<PeekingResultIterator> future =
+                executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
 
-                    @Override
-                    public PeekingResultIterator call() throws Exception {
-                        long startTime = System.currentTimeMillis();
-                        ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan);
-                        if (logger.isDebugEnabled()) {
-                            logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan, ScanUtil.getCustomAnnotations(splitScan)));
-                        }
-                        return iteratorFactory.newIterator(context, scanner, splitScan);
+                @Override
+                public PeekingResultIterator call() throws Exception {
+                    long startTime = System.currentTimeMillis();
+                    ResultIterator scanner = new TableResultIterator(context, tableRef, scan);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
                     }
+                    return iteratorFactory.newIterator(context, scanner, scan);
+                }
 
-                    /**
-                     * Defines the grouping for round robin behavior.  All threads spawned to process
-                     * this scan will be grouped together and time sliced with other simultaneously
-                     * executing parallel scans.
-                     */
-                    @Override
-                    public Object getJobId() {
-                        return ParallelIterators.this;
-                    }
-                }, "Parallel scanner for table: " + tableRef.getTable().getName().getString()));
-                futures.add(new Pair<KeyRange,Future<PeekingResultIterator>>(split,future));
-            }
+                /**
+                 * Defines the grouping for round robin behavior.  All threads spawned to process
+                 * this scan will be grouped together and time sliced with other simultaneously
+                 * executing parallel scans.
+                 */
+                @Override
+                public Object getJobId() {
+                    return ParallelIterators.this;
+                }
+            }, "Parallel scanner for table: " + tableRef.getTable().getName().getString()));
+            // Add our future in the right place so that we can concatenate the
+            // results of the inner futures versus merge sorting across all of them.
+            nestedFutures.get(scanLocation.getOuterListIndex()).set(scanLocation.getInnerListIndex(), new Pair<Scan,Future<PeekingResultIterator>>(scan,future));
         }
-
     }
 
     @Override
     public int size() {
-        return this.splits.size();
+        return this.scans.size();
     }
 
     @Override
@@ -418,6 +638,6 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
 
 	@Override
 	public String toString() {
-		return "ParallelIterators [splits=" + splits + "]";
+		return "ParallelIterators [scans=" + scans + "]";
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
deleted file mode 100644
index 81f5af6..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.iterate;
-
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SaltingUtil;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-
-/**
- * Split the region according to the information contained in the scan's SkipScanFilter.
- */
-public class SkipRangeParallelIteratorRegionSplitter extends DefaultParallelIteratorRegionSplitter {
-
-    public static SkipRangeParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) {
-        return new SkipRangeParallelIteratorRegionSplitter(context, table, hintNode);
-    }
-
-    protected SkipRangeParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) {
-        super(context, table, hintNode);
-    }
-
-    @Override
-    protected List<HRegionLocation> getAllRegions() throws SQLException {
-        List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes());
-        return filterRegions(allTableRegions, context.getScanRanges());
-    }
-
-    public List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, final ScanRanges ranges) {
-        Iterable<HRegionLocation> regions;
-        if (ranges == ScanRanges.EVERYTHING) {
-            return allTableRegions;
-        } else if (ranges == ScanRanges.NOTHING) { // TODO: why not emptyList?
-            return Lists.<HRegionLocation>newArrayList();
-        } else {
-            regions = Iterables.filter(allTableRegions,
-                    new Predicate<HRegionLocation>() {
-                    @Override
-                    public boolean apply(HRegionLocation region) {
-                        KeyRange minMaxRange = context.getMinMaxRange();
-                        if (minMaxRange != null) {
-                            KeyRange range = KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
-                            if (table.getBucketNum() != null) {
-                                // Add salt byte, as minMaxRange won't have it
-                                minMaxRange = SaltingUtil.addSaltByte(region.getRegionInfo().getStartKey(), minMaxRange);
-                            }
-                            range = range.intersect(minMaxRange);
-                            return ranges.intersect(range.getLowerRange(), range.getUpperRange());
-                        }
-                        return ranges.intersect(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
-                    }
-            });
-        }
-        return Lists.newArrayList(regions);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f7d6e14..4f67d4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -432,6 +432,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public boolean isDegenerate() {
                     return false;
                 }
+
+                @Override
+                public boolean isRowKeyOrdered() {
+                    return true;
+                }
                 
             };
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
index 68f786a..afcc741 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
@@ -210,6 +210,10 @@ public class KeyRange implements Writable {
         return compareLowerToUpperBound(b,o,l,true);
     }
 
+    public int compareLowerToUpperBound( byte[] b) {
+        return compareLowerToUpperBound(b,0,b.length);
+    }
+
     /**
      * Compares a lower bound against an upper bound
      * @param b upper bound byte array
@@ -237,6 +241,10 @@ public class KeyRange implements Writable {
         return 1;
     }
     
+    public int compareUpperToLowerBound(byte[] b) {
+        return compareUpperToLowerBound(b,0,b.length);
+    }
+    
     public int compareUpperToLowerBound(byte[] b, int o, int l) {
         return compareUpperToLowerBound(b,o,l, true);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java
deleted file mode 100644
index df55fb5..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.query;
-
-import java.sql.SQLException;
-
-import org.apache.phoenix.schema.TableRef;
-
-
-/**
- * 
- * Interface for managing and caching table statistics.
- * The frequency of updating the table statistics are controlled
- * by {@link org.apache.phoenix.query.QueryServices#STATS_UPDATE_FREQ_MS_ATTRIB}.
- * Table stats may also be manually updated through {@link #updateStats(TableRef)}.
- * 
- *
- * 
- * @since 0.1
- */
-public interface StatsManager {
-    /**
-     * Get the minimum key for the given table
-     * @param table the table
-     * @return the minimum key or null if unknown
-     */
-    byte[] getMinKey(TableRef table);
-    
-    /**
-     * Get the maximum key for the given table
-     * @param table the table
-     * @return the maximum key or null if unknown
-     */
-    byte[] getMaxKey(TableRef table);
-    
-    /**
-     * Manually update the cached table statistics
-     * @param table the table
-     * @throws SQLException
-     */
-    void updateStats(TableRef table) throws SQLException;
-    
-    void clearStats() throws SQLException;
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
deleted file mode 100644
index 905802b..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.query;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TimeKeeper;
-
-
-/**
- * 
- * Implementation of StatsManager. Table stats are updated asynchronously when they're
- * accessed and past time-to-live. In this case, future calls (after the asynchronous
- * call has completed), will have the updated stats.
- * 
- * All tables share the same HBase connection for a given connection and each connection
- * will have it's own cache for these stats. This isn't ideal and will get reworked when
- * the schema is kept on the server side. It's ok for now because:
- * 1) we only ask the server for these stats when the start/end region is queried against
- * 2) the query to get the stats pulls a single row so it's very cheap
- * 3) it's async and if it takes too long it won't lead to anything except less optimal
- *  parallelization.
- *
- * 
- * @since 0.1
- */
-public class StatsManagerImpl implements StatsManager {
-    private final ConnectionQueryServices services;
-    private final int statsUpdateFrequencyMs;
-    private final int maxStatsAgeMs;
-    private final TimeKeeper timeKeeper;
-    private final ConcurrentMap<String,PTableStats> tableStatsMap = new ConcurrentHashMap<String,PTableStats>();
-
-    public StatsManagerImpl(ConnectionQueryServices services, int statsUpdateFrequencyMs, int maxStatsAgeMs) {
-        this(services, statsUpdateFrequencyMs, maxStatsAgeMs, TimeKeeper.SYSTEM);
-    }
-    
-    public StatsManagerImpl(ConnectionQueryServices services, int statsUpdateFrequencyMs, int maxStatsAgeMs, TimeKeeper timeKeeper) {
-        this.services = services;
-        this.statsUpdateFrequencyMs = statsUpdateFrequencyMs;
-        this.maxStatsAgeMs = maxStatsAgeMs;
-        this.timeKeeper = timeKeeper;
-    }
-    
-    public long getStatsUpdateFrequency() {
-        return statsUpdateFrequencyMs;
-    }
-    
-    @Override
-    public void updateStats(TableRef tableRef) throws SQLException {
-        SQLException sqlE = null;
-        HTableInterface hTable = services.getTable(tableRef.getTable().getPhysicalName().getBytes());
-        try {
-            byte[] minKey = null, maxKey = null;
-            // Do a key-only scan to get the first row of a table. This is the min
-            // key for the table.
-            Scan scan = new Scan(HConstants.EMPTY_START_ROW, new KeyOnlyFilter());
-            ResultScanner scanner = hTable.getScanner(scan);
-            try {
-                Result r = scanner.next(); 
-                if (r != null) {
-                    minKey = r.getRow();
-                }
-            } finally {
-                scanner.close();
-            }
-            
-            // Get max possible key value
-            scan = new Scan();
-            scan.setFilter(new KeyOnlyFilter());
-            scan.setReversed(true);
-            scanner = hTable.getScanner(scan);
-            try {
-                Result r = scanner.next(); 
-                if (r != null) {
-                    maxKey = r.getRow();
-                }
-            } finally {
-                scanner.close();
-            }
-            tableStatsMap.put(tableRef.getTable().getName().getString(), new PTableStats(timeKeeper.getCurrentTime(),minKey,maxKey));
-        } catch (IOException e) {
-            sqlE = ServerUtil.parseServerException(e);
-        } finally {
-            try {
-                hTable.close();
-            } catch (IOException e) {
-                if (sqlE == null) {
-                    sqlE = ServerUtil.parseServerException(e);
-                } else {
-                    sqlE.setNextException(ServerUtil.parseServerException(e));
-                }
-            } finally {
-                if (sqlE != null) {
-                    throw sqlE;
-                }
-            }
-        }
-    }
-    
-    private PTableStats getStats(final TableRef table) {
-        PTableStats stats = tableStatsMap.get(table);
-        if (stats == null) {
-            PTableStats newStats = new PTableStats();
-            stats = tableStatsMap.putIfAbsent(table.getTable().getName().getString(), newStats);
-            stats = stats == null ? newStats : stats;
-        }
-        // Synchronize on the current stats for a table to prevent
-        // multiple attempts to update the stats.
-        synchronized (stats) {
-            long initiatedTime = stats.getInitiatedTime();
-            long currentTime = timeKeeper.getCurrentTime();
-            // Update stats asynchronously if they haven't been updated within the specified frequency.
-            // We update asynchronously because we don't ever want to block the caller - instead we'll continue
-            // to use the old one.
-            if ( currentTime - initiatedTime >= getStatsUpdateFrequency()) {
-                stats.setInitiatedTime(currentTime);
-                services.getExecutor().submit(new Callable<Void>() {
-
-                    @Override
-                    public Void call() throws Exception { // TODO: will exceptions be logged?
-                        updateStats(table);
-                        return null;
-                    }
-                    
-                });
-            }
-            // If the stats are older than the max age, use an empty stats
-            if (currentTime - stats.getCompletedTime() >= maxStatsAgeMs) {
-                return PTableStats.NO_STATS;
-            }
-        }
-        return stats;
-    }
-    
-    @Override
-    public byte[] getMinKey(TableRef table) {
-        PTableStats stats = getStats(table);
-        return stats.getMinKey();
-    }
-
-    @Override
-    public byte[] getMaxKey(TableRef table) {
-        PTableStats stats = getStats(table);
-        return stats.getMaxKey();
-    }
-
-    private static class PTableStats {
-        private static final PTableStats NO_STATS = new PTableStats();
-        private long initiatedTime;
-        private final long completedTime;
-        private final byte[] minKey;
-        private final byte[] maxKey;
-        
-        public PTableStats() {
-            this(-1,null,null);
-        }
-        public PTableStats(long completedTime, byte[] minKey, byte[] maxKey) {
-            this.minKey = minKey;
-            this.maxKey = maxKey;
-            this.completedTime = this.initiatedTime = completedTime;
-        }
-
-        private byte[] getMinKey() {
-            return minKey;
-        }
-
-        private byte[] getMaxKey() {
-            return maxKey;
-        }
-
-        private long getCompletedTime() {
-            return completedTime;
-        }
-
-        private void setInitiatedTime(long initiatedTime) {
-            this.initiatedTime = initiatedTime;
-        }
-
-        private long getInitiatedTime() {
-            return initiatedTime;
-        }
-    }
-    
-    @Override
-    public void clearStats() throws SQLException {
-        tableStatsMap.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
index 6b45c5e..82ae309 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -57,8 +58,7 @@ public class StatisticsCollector {
     private Map<String, byte[]> minMap = Maps.newHashMap();
     private Map<String, byte[]> maxMap = Maps.newHashMap();
     private long guidepostDepth;
-    private long byteCount = 0;
-    private Map<String, List<byte[]>> guidePostsMap = Maps.newHashMap();
+    private Map<String, Pair<Integer,List<byte[]>>> guidePostsMap = Maps.newHashMap();
     private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
     protected StatisticsTable statsTable;
     // Ensures that either analyze or compaction happens at any point of time.
@@ -135,7 +135,6 @@ public class StatisticsCollector {
         List<Cell> results = new ArrayList<Cell>();
         boolean hasMore = true;
         while (hasMore) {
-            // Am getting duplicates here. Need to avoid that
             hasMore = scanner.next(results);
             collectStatistics(results);
             count += results.size();
@@ -289,19 +288,21 @@ public class StatisticsCollector {
                 maxMap.put(fam, row);
             }
         }
-        byteCount += kv.getLength();
         // TODO : This can be moved to an interface so that we could collect guide posts in different ways
+        Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
+        if (gps == null) {
+            gps = new Pair<Integer,List<byte[]>>(0, Lists.<byte[]>newArrayList());
+            guidePostsMap.put(fam, gps);
+        }
+        int byteCount = gps.getFirst() + kv.getLength();
+        gps.setFirst(byteCount);
         if (byteCount >= guidepostDepth) {
-            if (guidePostsMap.get(fam) != null) {
-                guidePostsMap.get(fam).add(
-                        row);
-            } else {
-                List<byte[]> guidePosts = new ArrayList<byte[]>();
-                guidePosts.add(row);
-                guidePostsMap.put(fam, guidePosts);
+            // Prevent dups
+            List<byte[]> gpsKeys = gps.getSecond();
+            if (gpsKeys.isEmpty() || Bytes.compareTo(row, gpsKeys.get(gpsKeys.size()-1)) > 0) {
+                gpsKeys.add(row);
+                gps.setFirst(0); // Only reset count when adding guidepost
             }
-            // reset the count for the next key
-            byteCount = 0;
         }
     }
 
@@ -317,16 +318,19 @@ public class StatisticsCollector {
 
     public byte[] getGuidePosts(String fam) {
         if (!guidePostsMap.isEmpty()) {
-            List<byte[]> guidePosts = guidePostsMap.get(fam);
-            if (guidePosts != null) {
-                byte[][] array = new byte[guidePosts.size()][];
-                int i = 0;
-                for (byte[] element : guidePosts) {
-                    array[i] = element;
-                    i++;
+            Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
+            if (gps != null) {
+                List<byte[]> guidePosts = gps.getSecond();
+                if (!guidePosts.isEmpty()) {
+                    byte[][] array = new byte[guidePosts.size()][];
+                    int i = 0;
+                    for (byte[] element : guidePosts) {
+                        array[i] = element;
+                        i++;
+                    }
+                    PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array);
+                    return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray);
                 }
-                PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array);
-                return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray);
             }
         }
         return null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
index e92d61e..bc769e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ByteUtil;
 
 /**
  * Wrapper to access the statistics table SYSTEM.STATS using the HTable.
@@ -56,7 +57,7 @@ public class StatisticsTable implements Closeable {
         if (table == null) {
             // Map the statics table and the table with which the statistics is
             // associated. This is a workaround
-            HTablePool pool = new HTablePool(conf,1);
+            HTablePool pool = new HTablePool(conf,100);
             //HTable hTable = new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
             HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
             //h.setAutoFlushTo(true);
@@ -130,6 +131,9 @@ public class StatisticsTable implements Closeable {
                 currentTime, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
                 currentTime, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
+        // Add our empty column value so queries behave correctly
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
+                currentTime, ByteUtil.EMPTY_BYTE_ARRAY);
         mutations.add(put);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index fc79173..e321c9c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -46,7 +46,6 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
 
 import com.google.common.collect.Lists;
@@ -60,6 +59,7 @@ import com.google.common.collect.Lists;
  * @since 0.1
  */
 public class ScanUtil {
+    public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1];
     /*
      * Max length that we fill our key when we turn an inclusive key
      * into a exclusive key.
@@ -68,7 +68,7 @@ public class ScanUtil {
     static {
         Arrays.fill(MAX_FILL_LENGTH_FOR_PREVIOUS_KEY, (byte)-1);
     }
-    public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1];
+    private static final byte[] ZERO_BYTE_ARRAY = new byte[1024];
 
     private ScanUtil() {
     }
@@ -264,7 +264,7 @@ public class ScanUtil {
 
     private static byte[] getKey(RowKeySchema schema, List<List<KeyRange>> slots, int[] slotSpan, Bound bound) {
         if (slots.isEmpty()) {
-            return null;
+            return KeyRange.UNBOUND;
         }
         int[] position = new int[slots.size()];
         int maxLength = 0;
@@ -276,7 +276,7 @@ public class ScanUtil {
         byte[] key = new byte[maxLength];
         int length = setKey(schema, slots, slotSpan, position, bound, key, 0, 0, position.length);
         if (length == 0) {
-            return null;
+            return KeyRange.UNBOUND;
         }
         if (length == maxLength) {
             return key;
@@ -439,9 +439,35 @@ public class ScanUtil {
         return keyRanges;
     }
 
-    public static byte[] nextKey(byte[] key, PTable table, ImmutableBytesWritable ptr) {
+    /**
+     * Converts a partially qualified KeyRange into a KeyRange with a
+     * inclusive lower bound and an exclusive upper bound, widening
+     * as necessary.
+     */
+    public static KeyRange convertToInclusiveExclusiveRange (KeyRange partialRange, RowKeySchema schema, ImmutableBytesWritable ptr) {
+        // Ensure minMaxRange is lower inclusive and upper exclusive, as that's
+        // what we need to intersect against for the HBase scan.
+        byte[] lowerRange = partialRange.getLowerRange();
+        if (!partialRange.lowerUnbound()) {
+            if (!partialRange.isLowerInclusive()) {
+                lowerRange = ScanUtil.nextKey(lowerRange, schema, ptr);
+            }
+        }
+        
+        byte[] upperRange = partialRange.getUpperRange();
+        if (!partialRange.upperUnbound()) {
+            if (partialRange.isUpperInclusive()) {
+                upperRange = ScanUtil.nextKey(upperRange, schema, ptr);
+            }
+        }
+        if (partialRange.getLowerRange() != lowerRange || partialRange.getUpperRange() != upperRange) {
+            partialRange = KeyRange.getKeyRange(lowerRange, upperRange);
+        }
+        return partialRange;
+    }
+    
+    private static byte[] nextKey(byte[] key, RowKeySchema schema, ImmutableBytesWritable ptr) {
         int pos = 0;
-        RowKeySchema schema = table.getRowKeySchema();
         int maxOffset = schema.iterator(key, ptr);
         while (schema.next(ptr, pos, maxOffset) != null) {
             pos++;
@@ -500,6 +526,10 @@ public class ScanUtil {
         }
     }
 
+    public static int getRowKeyOffset(byte[] regionStartKey, byte[] regionEndKey) {
+        return regionStartKey.length > 0 ? regionStartKey.length : regionEndKey.length;
+    }
+    
     private static void setRowKeyOffset(Filter filter, int offset) {
         if (filter instanceof BooleanExpressionFilter) {
             BooleanExpressionFilter boolFilter = (BooleanExpressionFilter)filter;
@@ -566,4 +596,31 @@ public class ScanUtil {
     public static boolean isAnalyzeTable(Scan scan) {
         return scan.getAttribute((BaseScannerRegionObserver.ANALYZE_TABLE)) != null;
     }
+
+    public static boolean crossesPrefixBoundary(byte[] key, byte[] prefixBytes, int prefixLength) {
+        if (key.length < prefixLength) {
+            return true;
+        }
+        if (prefixBytes.length >= prefixLength) {
+            return Bytes.compareTo(prefixBytes, 0, prefixLength, key, 0, prefixLength) != 0;
+        }
+        return hasNonZeroLeadingBytes(key, prefixLength);
+    }
+
+    public static byte[] getPrefix(byte[] startKey, int prefixLength) {
+        // If startKey is at beginning, then our prefix will be a null padded byte array
+        return startKey.length >= prefixLength ? startKey : ByteUtil.EMPTY_BYTE_ARRAY;
+    }
+
+    private static boolean hasNonZeroLeadingBytes(byte[] key, int nBytesToCheck) {
+        if (nBytesToCheck > ZERO_BYTE_ARRAY.length) {
+            do {
+                if (Bytes.compareTo(key, nBytesToCheck - ZERO_BYTE_ARRAY.length, ZERO_BYTE_ARRAY.length, ScanUtil.ZERO_BYTE_ARRAY, 0, ScanUtil.ZERO_BYTE_ARRAY.length) != 0) {
+                    return true;
+                }
+                nBytesToCheck -= ZERO_BYTE_ARRAY.length;
+            } while (nBytesToCheck > ZERO_BYTE_ARRAY.length);
+        }
+        return Bytes.compareTo(key, 0, nBytesToCheck, ZERO_BYTE_ARRAY, 0, nBytesToCheck) != 0;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java
new file mode 100644
index 0000000..be90399
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.ScanUtil;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ScanRangesIntersectTest {
+
+    @Test
+    public void testPointLookupIntersect() throws Exception {
+        RowKeySchema schema = schema();
+        int[] slotSpan = ScanUtil.SINGLE_COLUMN_SLOT_SPAN;
+        List<KeyRange> keys = points("a","j","m","z");
+        ScanRanges ranges = ScanRanges.create(schema, Collections.singletonList(keys), slotSpan);
+        assertIntersect(ranges, "b", "l", "j");
+        
+    }
+    
+    private static void assertIntersect(ScanRanges ranges, String lowerRange, String upperRange, String... expectedPoints) {
+        List<KeyRange> expectedKeys = points(expectedPoints);
+        Collections.sort(expectedKeys,KeyRange.COMPARATOR);
+        Scan scan = new Scan();
+        scan.setFilter(ranges.getSkipScanFilter());
+        byte[] startKey = lowerRange == null ? KeyRange.UNBOUND : PDataType.VARCHAR.toBytes(lowerRange);
+        byte[] stopKey = upperRange == null ? KeyRange.UNBOUND : PDataType.VARCHAR.toBytes(upperRange);
+        Scan newScan = ranges.intersectScan(scan, startKey, stopKey, 0);
+        if (expectedPoints.length == 0) {
+            assertNull(newScan);
+        } else {
+            assertNotNull(newScan);
+            SkipScanFilter filter = (SkipScanFilter)newScan.getFilter();
+            assertEquals(expectedKeys, filter.getSlots().get(0));
+        }
+    }
+    
+    private static List<KeyRange> points(String... points) {
+        List<KeyRange> keys = Lists.newArrayListWithExpectedSize(points.length);
+        for (String point : points) {
+            keys.add(KeyRange.getKeyRange(PDataType.VARCHAR.toBytes(point)));
+        }
+        return keys;
+    }
+    
+    private static RowKeySchema schema() {
+        RowKeySchemaBuilder builder = new RowKeySchemaBuilder(1);
+        builder.addField(new PDatum() {
+            @Override
+            public boolean isNullable() {
+                return false;
+            }
+            @Override
+            public PDataType getDataType() {
+                return PDataType.VARCHAR;
+            }
+            @Override
+            public Integer getMaxLength() {
+                return null;
+            }
+            @Override
+            public Integer getScale() {
+                return null;
+            }
+            @Override
+            public SortOrder getSortOrder() {
+                return SortOrder.getDefault();
+            }
+        }, false, SortOrder.getDefault());
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
index cd88ce7..695c4c9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
@@ -72,7 +72,7 @@ public class ScanRangesTest {
             // incrementing the key too much.
             upperExclusiveKey = ByteUtil.nextKey(upperExclusiveKey);
         }
-        assertEquals(expectedResult, scanRanges.intersect(lowerInclusiveKey,upperExclusiveKey));
+        assertEquals(expectedResult, scanRanges.intersects(lowerInclusiveKey,upperExclusiveKey,0));
     }
 
     @Parameters(name="{0} {2}")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
index 3c0a952..063728c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
@@ -165,17 +165,22 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest {
                               KeyRange.getKeyRange(startKey2)));
         if (Bytes.compareTo(startKey1, startKey2) > 0) {
             expectedStartKey = startKey2;
-            expectedEndKey = ByteUtil.concat(startKey1, QueryConstants.SEPARATOR_BYTE_ARRAY);
+            expectedEndKey = startKey1;
             Collections.reverse(expectedRanges.get(0));
         } else {
             expectedStartKey = startKey1;
-            expectedEndKey = ByteUtil.concat(startKey2, QueryConstants.SEPARATOR_BYTE_ARRAY);;
+            expectedEndKey = startKey2;
         }
-        assertTrue(Bytes.compareTo(expectedStartKey, startKey) == 0);
-        assertTrue(Bytes.compareTo(expectedEndKey, stopKey) == 0);
+        assertEquals(0,startKey.length);
+        assertEquals(0,stopKey.length);
 
         assertNotNull(filter);
         assertTrue(filter instanceof SkipScanFilter);
+        SkipScanFilter skipScanFilter = (SkipScanFilter)filter;
+        assertEquals(1,skipScanFilter.getSlots().size());
+        assertEquals(2,skipScanFilter.getSlots().get(0).size());
+        assertArrayEquals(expectedStartKey, skipScanFilter.getSlots().get(0).get(0).getLowerRange());
+        assertArrayEquals(expectedEndKey, skipScanFilter.getSlots().get(0).get(1).getLowerRange());
         StatementContext context = plan.getContext();
         ScanRanges scanRanges = context.getScanRanges();
         List<List<KeyRange>> ranges = scanRanges.getRanges();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
index bd19663..032768b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
@@ -1185,9 +1185,8 @@ public class WhereOptimizerTest extends BaseConnectionlessQueryTest {
         StatementContext context = compileStatement(query, binds);
         Scan scan = context.getScan();
         Filter filter = scan.getFilter();
-        assertNotNull(filter);
-        assertTrue(filter instanceof SkipScanFilter);
-        byte[] expectedStartRow = ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId));
+        assertNull(filter);
+        byte[] expectedStartRow = ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId2));
         byte[] expectedStopRow = ByteUtil.concat(ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId2)), QueryConstants.SEPARATOR_BYTE_ARRAY);
         assertArrayEquals(expectedStartRow, scan.getStartRow());
         assertArrayEquals(expectedStopRow, scan.getStopRow());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
index ff31f7c..8ac322f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
@@ -27,6 +27,7 @@ import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.MULTI_CF_NAME;
 import static org.apache.phoenix.util.TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+import static org.apache.phoenix.util.TestUtil.PTSDB3_NAME;
 import static org.apache.phoenix.util.TestUtil.PTSDB_NAME;
 import static org.apache.phoenix.util.TestUtil.TABLE_WITH_ARRAY;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -103,6 +104,7 @@ public class BaseConnectionlessQueryTest extends BaseTest {
         ensureTableCreated(getUrl(), ENTITY_HISTORY_TABLE_NAME);
         ensureTableCreated(getUrl(), FUNKY_NAME);
         ensureTableCreated(getUrl(), PTSDB_NAME);
+        ensureTableCreated(getUrl(), PTSDB3_NAME);
         ensureTableCreated(getUrl(), MULTI_CF_NAME);
         ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME);
         ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME);
@@ -110,7 +112,6 @@ public class BaseConnectionlessQueryTest extends BaseTest {
         ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME);
         ensureTableCreated(getUrl(), TABLE_WITH_ARRAY);
         Properties props = new Properties();
-        //props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_TABLE_TIMESTAMP));
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP));
         PhoenixConnection conn = DriverManager.getConnection(PHOENIX_CONNECTIONLESS_JDBC_URL, props).unwrap(PhoenixConnection.class);
         try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
new file mode 100644
index 0000000..fd22e47
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class QueryPlanTest extends BaseConnectionlessQueryTest {
+    
+    @Test
+    public void testExplainPlan() throws Exception {
+        String[] queryPlans = new String[] {
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000001','000000000000005') ",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000005'] - ['000000000000001','000000000000008']",
+
+                "SELECT host FROM PTSDB3 WHERE host IN ('na1', 'na2','na3')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 KEYS OVER PTSDB3 [~'na3'] - [~'na1']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY",
+
+                "SELECT host FROM PTSDB WHERE inst IS NULL AND host IS NOT NULL AND date >= to_date('2013-01-01')",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER PTSDB [null,not null]\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY AND DATE >= '2013-01-01 00:00:00.000'",
+
+                // Since inst IS NOT NULL is unbounded, we won't continue optimizing
+                "SELECT host FROM PTSDB WHERE inst IS NOT NULL AND host IS NULL AND date >= to_date('2013-01-01')",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER PTSDB [not null]\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY AND (HOST IS NULL AND DATE >= '2013-01-01 00:00:00.000')",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id = '000000000000002' AND x_integer = 2 AND a_integer < 5 ",
+                "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER ATABLE\n" + 
+                "    SERVER FILTER BY (X_INTEGER = 2 AND A_INTEGER < 5)",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) <= ('000000000000001','000000000000005') ",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000003'] - ['000000000000001','000000000000006']",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id > '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000003','000000000000005') ",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000003','000000000000005'] - [*]\n" + 
+                "    SERVER FILTER BY (ENTITY_ID > '000000000000002' AND ENTITY_ID < '000000000000008')",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id >= '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000000','000000000000005') ",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000002'] - ['000000000000001','000000000000008']",
+
+                "SELECT * FROM atable",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE",
+
+                "SELECT inst,host FROM PTSDB WHERE inst IN ('na1', 'na2','na3') AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 6 RANGES OVER PTSDB ['na1','a','2013-01-01'] - ['na3','b','2013-01-02']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY",
+
+                "SELECT inst,host FROM PTSDB WHERE inst LIKE 'na%' AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 RANGES OVER PTSDB ['na','a','2013-01-01'] - ['nb','b','2013-01-02']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY",
+
+                "SELECT count(*) FROM atable",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
+                "    SERVER AGGREGATE INTO SINGLE ROW",
+
+                "SELECT count(*) FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003            '] - ['000000000000001','004            ']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
+                "    SERVER AGGREGATE INTO SINGLE ROW",
+
+                "SELECT a_string FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003            '] - ['000000000000001','004            ']",
+
+                "SELECT count(1) FROM atable GROUP BY a_string",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" +
+                "CLIENT MERGE SORT",
+
+                "SELECT count(1) FROM atable GROUP BY a_string LIMIT 5",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + 
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" + 
+                "CLIENT MERGE SORT\n" + 
+                "CLIENT 5 ROW LIMIT",
+
+                "SELECT a_string FROM atable ORDER BY a_string DESC LIMIT 3",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + 
+                "    SERVER TOP 3 ROWS SORTED BY [A_STRING DESC]\n" + 
+                "CLIENT MERGE SORT",
+
+                "SELECT count(1) FROM atable GROUP BY a_string,b_string HAVING max(a_string) = 'a'",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT FILTER BY MAX(A_STRING) = 'a'",
+
+                "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY ROUND(a_time,'HOUR',2),entity_id HAVING max(a_string) = 'a'",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER FILTER BY A_INTEGER = 1\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [ENTITY_ID, ROUND(A_TIME)]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT FILTER BY MAX(A_STRING) = 'a'",
+
+                "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY a_string,b_string HAVING max(a_string) = 'a' ORDER BY b_string",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER FILTER BY A_INTEGER = 1\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT FILTER BY MAX(A_STRING) = 'a'\n" +
+                "CLIENT SORTED BY [B_STRING]",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id != '000000000000002' AND x_integer = 2 AND a_integer < 5 LIMIT 10",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
+                "    SERVER FILTER BY (ENTITY_ID != '000000000000002' AND X_INTEGER = 2 AND A_INTEGER < 5)\n" + 
+                "    SERVER 10 ROW LIMIT\n" + 
+                "CLIENT 10 ROW LIMIT",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string ASC NULLS FIRST LIMIT 10",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
+                "    SERVER TOP 10 ROWS SORTED BY [A_STRING]\n" + 
+                "CLIENT MERGE SORT",
+
+                "SELECT max(a_integer) FROM atable WHERE organization_id = '000000000000001' GROUP BY organization_id,entity_id,ROUND(a_date,'HOUR') ORDER BY entity_id NULLS LAST LIMIT 10",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [ORGANIZATION_ID, ENTITY_ID, ROUND(A_DATE)]\n" + 
+                "CLIENT MERGE SORT\n" + 
+                "CLIENT TOP 10 ROWS SORTED BY [ENTITY_ID NULLS LAST]",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string DESC NULLS LAST LIMIT 10",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
+                "    SERVER TOP 10 ROWS SORTED BY [A_STRING DESC NULLS LAST]\n" + 
+                "CLIENT MERGE SORT",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id IN ('000000000000001', '000000000000005')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 KEYS OVER ATABLE ['000000000000001'] - ['000000000000005']",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id IN ('00D000000000001', '00D000000000005') AND entity_id IN('00E00000000000X','00E00000000000Z')",
+                "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 4 KEYS OVER ATABLE",
+
+                "SELECT inst,host FROM PTSDB WHERE REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1', 'na2','na3')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 RANGES OVER PTSDB ['na1'] - ['na4']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY AND REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1','na2','na3')",
+
+        };
+        for (int i = 0; i < queryPlans.length; i+=2) {
+            String query = queryPlans[i];
+            String plan = queryPlans[i+1];
+            Properties props = new Properties();
+            // Override date format so we don't have a bunch of zeros
+            props.setProperty(QueryServices.DATE_FORMAT_ATTRIB, "yyyy-MM-dd");
+            Connection conn = DriverManager.getConnection(getUrl(), props);
+            try {
+                Statement statement = conn.createStatement();
+                ResultSet rs = statement.executeQuery("EXPLAIN " + query);
+                // TODO: figure out a way of verifying that query isn't run during explain execution
+                assertEquals((i/2+1) + ") " + query, plan, QueryUtil.getExplainPlan(rs));
+            } finally {
+                conn.close();
+            }
+        }
+    }
+
+}


Mime
View raw message