phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject git commit: PHOENIX-951 Don't push LIMIT as PageFilter for joins
Date Mon, 28 Apr 2014 15:54:03 GMT
Repository: incubator-phoenix
Updated Branches:
  refs/heads/master 40b76b852 -> 2c97b7b64


PHOENIX-951 Don't push LIMIT as PageFilter for joins


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/2c97b7b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/2c97b7b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/2c97b7b6

Branch: refs/heads/master
Commit: 2c97b7b6483973d13860980be854fb9ad0cf774f
Parents: 40b76b8
Author: maryannxue <maryannxue@apache.org>
Authored: Mon Apr 28 11:53:40 2014 -0400
Committer: maryannxue <maryannxue@apache.org>
Committed: Mon Apr 28 11:53:40 2014 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 143 +++++++++++++++++++
 .../apache/phoenix/compile/JoinCompiler.java    |  15 +-
 .../apache/phoenix/compile/QueryCompiler.java   |  24 ++--
 .../coprocessor/HashJoinRegionScanner.java      |  13 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |  34 ++---
 .../org/apache/phoenix/execute/ScanPlan.java    |   6 +-
 .../org/apache/phoenix/join/HashJoinInfo.java   |  16 ++-
 7 files changed, 215 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/2c97b7b6/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 2e0d329..6493a2e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -404,6 +404,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "                    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "                    BUILD HASH TABLE 0\n" +
                 "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+                /*
+                 * testJoinWithLimit()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+                 */
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME
+ "\n" +
+                "    SERVER FILTER BY PageFilter 4\n" +
+                "    SERVER 4 ROW LIMIT\n" +
+                "CLIENT 4 ROW LIMIT\n" +
+                "    PARALLEL EQUI-JOIN 2 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_DISPLAY_NAME
+ "\n" +
+                "    BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME
+ "\n" +
+                "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithLimit()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+                 */
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME
+ "\n" +
+                "CLIENT 4 ROW LIMIT\n" +
+                "    PARALLEL EQUI-JOIN 2 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_DISPLAY_NAME
+ "\n" +
+                "    BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME
+ "\n" +
+                "    JOIN-SCANNER 4 ROW LIMIT",
                 }});
         testCases.add(new String[][] {
                 {
@@ -710,6 +742,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "                    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "                    BUILD HASH TABLE 0\n" +
                 "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+                /*
+                 * testJoinWithLimit()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+                 */
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME
+ "\n" +
+                "    SERVER FILTER BY PageFilter 4\n" +
+                "    SERVER 4 ROW LIMIT\n" +
+                "CLIENT 4 ROW LIMIT\n" +
+                "    PARALLEL EQUI-JOIN 2 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n"
+
+                "    BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME
+ "\n" +
+                "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithLimit()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
+                 */
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME
+ "\n" +
+                "CLIENT 4 ROW LIMIT\n" +
+                "    PARALLEL EQUI-JOIN 2 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n"
+
+                "    BUILD HASH TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME
+ "\n" +
+                "    JOIN-SCANNER 4 ROW LIMIT",
                 }});
         return testCases;
     }
@@ -3015,6 +3079,85 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
             conn.close();
         }
     }
+    
+    @Test
+    public void testJoinWithLimit() throws Exception {
+        String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM "
+ JOIN_SUPPLIER_TABLE_FULL_NAME + " s LEFT JOIN " 
+                + JOIN_ITEM_TABLE_FULL_NAME + " i ON i.\"supplier_id\" = s.\"supplier_id\"
LEFT JOIN "
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LIMIT
4";
+        String query2 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM "
+ JOIN_SUPPLIER_TABLE_FULL_NAME + " s JOIN " 
+                + JOIN_ITEM_TABLE_FULL_NAME + " i ON i.\"supplier_id\" = s.\"supplier_id\"
JOIN "
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LIMIT
4";
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getString(4), "101 YYY Street");
+            assertEquals(rs.getInt(5), 1000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getString(4), "101 YYY Street");
+            assertEquals(rs.getInt(5), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getInt(5), 5000);
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getInt(5), 0);
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query1);
+            assertEquals(plans[19], QueryUtil.getExplainPlan(rs));
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getString(4), "101 YYY Street");
+            assertEquals(rs.getInt(5), 1000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getString(4), "101 YYY Street");
+            assertEquals(rs.getInt(5), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getInt(5), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getString(4), "606 YYY Street");
+            assertEquals(rs.getInt(5), 2000);
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query2);
+            assertEquals(plans[20], QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/2c97b7b6/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index c494194..7fb2d6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -225,6 +225,7 @@ public class JoinCompiler {
         private final List<ParseNode> postFilters;
         private final List<Table> tables;
         private final List<TableRef> tableRefs;
+        private final boolean allLeftJoin;
         private final boolean hasRightJoin;
         private final List<JoinTable> prefilterAcceptedTables;
         
@@ -234,6 +235,7 @@ public class JoinCompiler {
             this.postFilters = Collections.<ParseNode>emptyList();
             this.tables = Collections.<Table>singletonList(table);
             this.tableRefs = Collections.<TableRef>singletonList(table.getTableRef());
+            this.allLeftJoin = false;
             this.hasRightJoin = false;
             this.prefilterAcceptedTables = Collections.<JoinTable>emptyList();
         }
@@ -245,16 +247,20 @@ public class JoinCompiler {
             this.tables = new ArrayList<Table>();
             this.tableRefs = new ArrayList<TableRef>();
             this.tables.add(table);
+            boolean allLeftJoin = true;
             int lastRightJoinIndex = -1;
             for (int i = 0; i < joinSpecs.size(); i++) {
-                this.tables.addAll(joinSpecs.get(i).getJoinTable().getTables());
-                if (joinSpecs.get(i).getType() == JoinType.Right) {
+                JoinSpec joinSpec = joinSpecs.get(i);
+                this.tables.addAll(joinSpec.getJoinTable().getTables());
+                allLeftJoin = allLeftJoin && joinSpec.getType() == JoinType.Left;
+                if (joinSpec.getType() == JoinType.Right) {
                     lastRightJoinIndex = i;
                 }
             }
             for (Table t : this.tables) {
                 this.tableRefs.add(t.getTableRef());
             }
+            this.allLeftJoin = allLeftJoin;
             this.hasRightJoin = lastRightJoinIndex > -1;
             this.prefilterAcceptedTables = new ArrayList<JoinTable>();
             for (int i = lastRightJoinIndex == -1 ? 0 : lastRightJoinIndex; i < joinSpecs.size();
i++) {
@@ -281,6 +287,10 @@ public class JoinCompiler {
             return tableRefs;
         }
         
+        public boolean isAllLeftJoin() {
+            return allLeftJoin;
+        }
+        
         public SelectStatement getStatement() {
             return select;
         }
@@ -351,7 +361,6 @@ public class JoinCompiler {
          * Returns a boolean vector indicating whether the evaluation of join expressions
          * can be evaluated at an early stage if the input JoinSpec can be taken as a
          * star join. Otherwise returns null.  
-         * @param join the JoinSpec
          * @return a boolean vector for a star join; or null for non star join.
          */
         public boolean[] getStarJoinVector() {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/2c97b7b6/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 8b5cd93..1f39ad9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -127,7 +127,7 @@ public class QueryCompiler {
             JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
             return compileJoinQuery(context, binds, joinTable, false);
         } else {
-            return compileSingleQuery(context, select, binds, parallelIteratorFactory);
+            return compileSingleQuery(context, select, binds, parallelIteratorFactory, true);
         }
     }
     
@@ -144,7 +144,7 @@ public class QueryCompiler {
                 context.setCurrentTable(table.getTableRef());
                 context.setResolver(projectedTable.createColumnResolver());
                 table.projectColumns(context.getScan());
-                return compileSingleQuery(context, subquery, binds, null);
+                return compileSingleQuery(context, subquery, binds, null, true);
             }
             QueryPlan plan = compileSubquery(subquery);
             ProjectedPTableWrapper projectedTable = table.createProjectedTable(plan.getProjector());
@@ -219,9 +219,13 @@ public class QueryCompiler {
             }
             context.setCurrentTable(tableRef);
             context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
-            BasicQueryPlan plan = compileSingleQuery(context, query, binds, parallelIteratorFactory);
+            BasicQueryPlan plan = compileSingleQuery(context, query, binds, parallelIteratorFactory,
joinTable.isAllLeftJoin());
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds,
joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression,
forceProjection);
+            Integer limit = null;
+            if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct()
&& query.getOrderBy().isEmpty()) {
+                limit = LimitCompiler.compile(context, query);
+            }
+            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds,
joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression,
limit, forceProjection);
             return new HashJoinPlan(joinTable.getStatement(), plan, joinInfo, hashExpressions,
joinPlans, clientProjectors);
         }
         
@@ -270,9 +274,13 @@ public class QueryCompiler {
             TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector());
             context.setCurrentTable(rhsTableRef);
             context.setResolver(projectedTable.createColumnResolver());
-            BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, parallelIteratorFactory);
+            BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, parallelIteratorFactory,
type == JoinType.Right);
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds,
new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left},
new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression,
forceProjection);
+            Integer limit = null;
+            if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct()
&& rhs.getOrderBy().isEmpty()) {
+                limit = LimitCompiler.compile(context, rhs);
+            }
+            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds,
new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left},
new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression,
limit, forceProjection);
             return new HashJoinPlan(joinTable.getStatement(), rhsPlan, joinInfo, new List[]
{hashExpressions}, new QueryPlan[] {lhsPlan}, new TupleProjector[] {clientProjector});
         }
         
@@ -287,7 +295,7 @@ public class QueryCompiler {
         return statement.getConnection().getQueryServices().getOptimizer().optimize(statement,
plan);
     }
     
-    protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement
select, List<Object> binds, ParallelIteratorFactory parallelIteratorFactory) throws
SQLException{
+    protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement
select, List<Object> binds, ParallelIteratorFactory parallelIteratorFactory, boolean
allowPageFilter) throws SQLException{
         PhoenixConnection connection = statement.getConnection();
         ColumnResolver resolver = context.getResolver();
         TableRef tableRef = context.getCurrentTable();
@@ -328,7 +336,7 @@ public class QueryCompiler {
         if (select.isAggregate() || select.isDistinct()) {
             return new AggregatePlan(context, select, tableRef, projector, limit, orderBy,
parallelIteratorFactory, groupBy, having);
         } else {
-            return new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory);
+            return new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory,
allowPageFilter);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/2c97b7b6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 0be219f..47ffce7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -54,6 +54,8 @@ public class HashJoinRegionScanner implements RegionScanner {
     private final HashJoinInfo joinInfo;
     private Queue<Tuple> resultQueue;
     private boolean hasMore;
+    private long count;
+    private long limit;
     private HashCache[] hashCaches;
     private List<Tuple>[] tempTuples;
     private ValueBitSet tempDestBitSet;
@@ -66,11 +68,16 @@ public class HashJoinRegionScanner implements RegionScanner {
         this.joinInfo = joinInfo;
         this.resultQueue = new LinkedList<Tuple>();
         this.hasMore = true;
+        this.count = 0;
+        this.limit = Long.MAX_VALUE;
         if (joinInfo != null) {
             for (JoinType type : joinInfo.getJoinTypes()) {
                 if (type != JoinType.Inner && type != JoinType.Left)
                     throw new DoNotRetryIOException("Got join type '" + type + "'. Expect
only INNER or LEFT with hash-joins.");
             }
+            if (joinInfo.getLimit() != null) {
+                this.limit = joinInfo.getLimit();
+            }
             int count = joinInfo.getJoinIds().length;
             this.tempTuples = new List[count];
             this.hashCaches = new HashCache[count];
@@ -93,7 +100,7 @@ public class HashJoinRegionScanner implements RegionScanner {
         }
     }
     
-    private void processResults(List<Cell> result, boolean hasLimit) throws IOException
{
+    private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException
{
         if (result.isEmpty())
             return;
         
@@ -106,7 +113,7 @@ public class HashJoinRegionScanner implements RegionScanner {
             return;
         }
         
-        if (hasLimit)
+        if (hasBatchLimit)
             throw new UnsupportedOperationException("Cannot support join operations in scans
with limit");
 
         int count = joinInfo.getJoinIds().length;
@@ -204,7 +211,7 @@ public class HashJoinRegionScanner implements RegionScanner {
         for (int i = 0; i < tuple.size(); i++) {
             results.add(tuple.getValue(i));
         }
-        return resultQueue.isEmpty() ? hasMore : true;
+        return (count++ < limit) && (resultQueue.isEmpty() ? hasMore : true);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/2c97b7b6/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index abd4475..401c15b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -57,14 +57,14 @@ import com.google.common.collect.Lists;
 
 public class HashJoinPlan implements QueryPlan {
     private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
-    
+
     private final FilterableStatement statement;
     private final BasicQueryPlan plan;
     private final HashJoinInfo joinInfo;
     private final List<Expression>[] hashExpressions;
     private final QueryPlan[] hashPlans;
     private final TupleProjector[] clientProjectors;
-    
+
     public HashJoinPlan(FilterableStatement statement, 
             BasicQueryPlan plan, HashJoinInfo joinInfo,
             List<Expression>[] hashExpressions, QueryPlan[] hashPlans, 
@@ -96,11 +96,11 @@ public class HashJoinPlan implements QueryPlan {
     public ResultIterator iterator() throws SQLException {
         ImmutableBytesPtr[] joinIds = joinInfo.getJoinIds();
         assert (joinIds.length == hashExpressions.length && joinIds.length == hashPlans.length);
-        
+
         final HashCacheClient hashClient = new HashCacheClient(plan.getContext().getConnection());
         Scan scan = plan.getContext().getScan();
         final ScanRanges ranges = plan.getContext().getScanRanges();
-        
+
         int count = joinIds.length;
         ConnectionQueryServices services = getContext().getConnection().getQueryServices();
         ExecutorService executor = services.getExecutor();
@@ -144,7 +144,7 @@ public class HashJoinPlan implements QueryPlan {
             } catch (ExecutionException e) {
                 if (firstException == null) {
                     firstException = new SQLException("Encountered exception in hash plan
[" + i + "] execution.", 
-                        e.getCause());
+                            e.getCause());
                 }
             }
         }
@@ -152,12 +152,12 @@ public class HashJoinPlan implements QueryPlan {
             SQLCloseables.closeAllQuietly(dependencies);
             throw firstException;
         }
-        
+
         HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
-        
+
         return plan.iterator(dependencies);
     }
-    
+
     @Override
     public long getEstimatedSize() {
         return plan.getEstimatedSize();
@@ -177,16 +177,19 @@ public class HashJoinPlan implements QueryPlan {
         for (int i = 0; i < count; i++) {
             boolean earlyEvaluation = joinInfo.earlyEvaluation()[i];
             boolean skipMerge = joinInfo.getSchemas()[i].getFieldCount() == 0;
-        	planSteps.add("    BUILD HASH TABLE " + i + (earlyEvaluation ? "" : "(DELAYED EVALUATION)")
+ (skipMerge ? " (SKIP MERGE)" : ""));
-        	List<String> steps = hashPlans[i].getExplainPlan().getPlanSteps();
-        	for (String step : steps) {
-        		planSteps.add("        " + step);
-        	}
+            planSteps.add("    BUILD HASH TABLE " + i + (earlyEvaluation ? "" : "(DELAYED
EVALUATION)") + (skipMerge ? " (SKIP MERGE)" : ""));
+            List<String> steps = hashPlans[i].getExplainPlan().getPlanSteps();
+            for (String step : steps) {
+                planSteps.add("        " + step);
+            }
         }
         if (joinInfo.getPostJoinFilterExpression() != null) {
-        	planSteps.add("    AFTER-JOIN SERVER FILTER BY " + joinInfo.getPostJoinFilterExpression().toString());
+            planSteps.add("    AFTER-JOIN SERVER FILTER BY " + joinInfo.getPostJoinFilterExpression().toString());
         }
-        
+        if (joinInfo.getLimit() != null) {
+            planSteps.add("    JOIN-SCANNER " + joinInfo.getLimit() + " ROW LIMIT");
+        }
+
         return new ExplainPlan(planSteps);
     }
 
@@ -217,7 +220,6 @@ public class HashJoinPlan implements QueryPlan {
 
     @Override
     public boolean isDegenerate() {
-        // TODO can we determine this won't return anything?
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/2c97b7b6/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d09ed34..fb11b47 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -57,11 +57,13 @@ import org.apache.phoenix.util.ScanUtil;
  */
 public class ScanPlan extends BasicQueryPlan {
     private List<KeyRange> splits;
+    private boolean allowPageFilter;
     
-    public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table,
RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory)
{
+    public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table,
RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory,
boolean allowPageFilter) {
         super(context, statement, table, projector, context.getBindManager().getParameterMetaData(),
limit, orderBy, null, 
                 parallelIteratorFactory != null ? parallelIteratorFactory :
                     new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+        this.allowPageFilter = allowPageFilter;
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
             int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
                     QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -89,7 +91,7 @@ public class ScanPlan extends BasicQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        ParallelIterators iterators = new ParallelIterators(context, tableRef, statement,
projection, GroupBy.EMPTY_GROUP_BY, isOrdered ? null : limit, parallelIteratorFactory);
+        ParallelIterators iterators = new ParallelIterators(context, tableRef, statement,
projection, GroupBy.EMPTY_GROUP_BY, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
         splits = iterators.getSplits();
         if (isOrdered) {
             scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/2c97b7b6/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
index 62f8c71..3cbf58f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
@@ -45,10 +45,11 @@ public class HashJoinInfo {
     private KeyValueSchema[] schemas;
     private int[] fieldPositions;
     private Expression postJoinFilterExpression;
+    private Integer limit;
     private boolean forceProjection;
     
-    public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[]
joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions,
Expression postJoinFilterExpression, boolean forceProjection) {
-    	this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation,
buildSchemas(tables), fieldPositions, postJoinFilterExpression, forceProjection);
+    public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[]
joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions,
Expression postJoinFilterExpression, Integer limit, boolean forceProjection) {
+    	this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation,
buildSchemas(tables), fieldPositions, postJoinFilterExpression, limit, forceProjection);
     }
     
     private static KeyValueSchema[] buildSchemas(PTable[] tables) {
@@ -71,7 +72,7 @@ public class HashJoinInfo {
         return builder.build();
     }
     
-    private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[]
joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas,
int[] fieldPositions, Expression postJoinFilterExpression, boolean forceProjection) {
+    private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[]
joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas,
int[] fieldPositions, Expression postJoinFilterExpression, Integer limit, boolean forceProjection)
{
     	this.joinedSchema = joinedSchema;
     	this.joinIds = joinIds;
         this.joinExpressions = joinExpressions;
@@ -80,6 +81,7 @@ public class HashJoinInfo {
         this.schemas = schemas;
         this.fieldPositions = fieldPositions;
         this.postJoinFilterExpression = postJoinFilterExpression;
+        this.limit = limit;
         this.forceProjection = forceProjection;
     }
     
@@ -115,6 +117,10 @@ public class HashJoinInfo {
         return postJoinFilterExpression;
     }
     
+    public Integer getLimit() {
+        return limit;
+    }
+    
     /*
      * If the LHS table is a sub-select, we always do projection, since
      * the ON expressions reference only projected columns.
@@ -148,6 +154,7 @@ public class HashJoinInfo {
             } else {
                 WritableUtils.writeVInt(output, -1);
             }
+            WritableUtils.writeVInt(output, joinInfo.limit == null ? -1 : joinInfo.limit);
             output.writeBoolean(joinInfo.forceProjection);
             scan.setAttribute(HASH_JOIN, stream.toByteArray());
         } catch (IOException e) {
@@ -204,8 +211,9 @@ public class HashJoinInfo {
                 postJoinFilterExpression = ExpressionType.values()[expressionOrdinal].newInstance();
                 postJoinFilterExpression.readFields(input);
             }
+            int limit = WritableUtils.readVInt(input);
             boolean forceProjection = input.readBoolean();
-            return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation,
schemas, fieldPositions, postJoinFilterExpression, forceProjection);
+            return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation,
schemas, fieldPositions, postJoinFilterExpression, limit >= 0 ? limit : null, forceProjection);
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {


Mime
View raw message