phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: PHOENIX-539 Chunked loading of parallel scanning
Date Tue, 08 Jul 2014 19:28:19 GMT
Repository: phoenix
Updated Branches:
  refs/heads/3.0 c46364ed1 -> ec1b76ab2


PHOENIX-539 Chunked loading of parallel scanning

Instead of spooling all data in a table to disk, load it in chunks
as needed, potentially spooling large quantities of data to disk.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ec1b76ab
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ec1b76ab
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ec1b76ab

Branch: refs/heads/3.0
Commit: ec1b76ab28880d548c8061a6dcb80a6fe0d97ed6
Parents: c46364e
Author: Gabriel Reid <greid@apache.org>
Authored: Tue Jun 10 07:36:37 2014 +0200
Committer: Gabriel Reid <gabrielr@ngdata.com>
Committed: Tue Jul 8 20:37:30 2014 +0200

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/QueryIT.java     |  11 +-
 .../phoenix/compile/StatementContext.java       |  23 +++
 .../org/apache/phoenix/execute/ScanPlan.java    |  43 ++--
 .../phoenix/iterate/ChunkedResultIterator.java  | 196 +++++++++++++++++++
 .../phoenix/iterate/ParallelIterators.java      |   7 +-
 .../org/apache/phoenix/join/HashJoinInfo.java   |  48 +++--
 .../org/apache/phoenix/query/QueryServices.java |   5 +
 .../phoenix/query/QueryServicesOptions.java     |   3 +
 8 files changed, 298 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
index 867eb2a..f453853 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
@@ -861,7 +861,7 @@ public class QueryIT extends BaseClientManagedTimeIT {
         try {
             PreparedStatement statement = conn.prepareStatement(query);
             statement.setString(1, tenantId);
-            statement.setString(2,ROW4);
+            statement.setString(2, ROW4);
             ResultSet rs = statement.executeQuery();
             assertTrue(rs.next());
             assertEquals(A_VALUE, rs.getString(1));
@@ -879,17 +879,17 @@ public class QueryIT extends BaseClientManagedTimeIT {
             
             byte[] tableName = Bytes.toBytes(ATABLE_NAME);
             admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-            HTable htable = (HTable)conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName);
+            HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName);
             htable.clearRegionCache();
             int nRegions = htable.getRegionLocations().size();
-            admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A"
+ Character.valueOf((char)('3' + nextRunCount()))+ ts))); // vary split point with test run
+            admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A"
+ Character.valueOf((char) ('3' + nextRunCount())) + ts))); // vary split point with test
run
             int retryCount = 0;
             do {
                 Thread.sleep(2000);
                 retryCount++;
                 //htable.clearRegionCache();
             } while (retryCount < 10 && htable.getRegionLocations().size() ==
nRegions);
-            assertNotEquals(nRegions,htable.getRegionLocations().size());
+            assertNotEquals(nRegions, htable.getRegionLocations().size());
             
             statement.setString(1, tenantId);
             rs = statement.executeQuery();
@@ -906,9 +906,10 @@ public class QueryIT extends BaseClientManagedTimeIT {
             assertEquals(E_VALUE, rs.getString(2));
            assertEquals(1, rs.getLong(3));
             assertFalse(rs.next());
-            
         } finally {
+            if (admin != null) {
             admin.close();
+            }
             conn.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 4c907d6..06d5f89 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -91,6 +91,29 @@ public class StatementContext {
         this.whereConditionColumns = new ArrayList<Pair<byte[],byte[]>>();
     }
 
+    /**
+     * Copy constructor where an altered scan can be set.
+     *
+     * @param stmtContext the {@code StatementContext} to be copied
+     * @param scan the customized scan
+     */
+    public StatementContext(StatementContext stmtContext, Scan scan) {
+        this.statement = stmtContext.statement;
+        this.resolver = stmtContext.resolver;
+        this.scan = scan;
+        this.sequences = stmtContext.sequences;
+        this.binds = stmtContext.binds;
+        this.aggregates = stmtContext.aggregates;
+        this.expressions = stmtContext.expressions;
+        this.dateFormat = stmtContext.dateFormat;
+        this.dateFormatter = stmtContext.dateFormatter;
+        this.dateParser = stmtContext.dateParser;
+        this.numberFormat = stmtContext.numberFormat;
+        this.tempPtr = new ImmutableBytesWritable();
+        this.currentTable = stmtContext.currentTable;
+        this.whereConditionColumns = stmtContext.whereConditionColumns;
+    }
+
     public String getDateFormat() {
         return dateFormat;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index fb11b47..a994067 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -18,15 +18,13 @@
 package org.apache.phoenix.execute;
 
 
-import java.sql.SQLException;
-import java.util.List;
-
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.iterate.ChunkedResultIterator;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
@@ -46,23 +44,26 @@ import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ScanUtil;
 
+import java.sql.SQLException;
+import java.util.List;
+
 
 
 /**
- * 
+ *
  * Query plan for a basic table scan
  *
- * 
+ *
  * @since 0.1
  */
 public class ScanPlan extends BasicQueryPlan {
     private List<KeyRange> splits;
     private boolean allowPageFilter;
-    
+
     public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table,
RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory,
boolean allowPageFilter) {
-        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(),
limit, orderBy, null, 
+        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(),
limit, orderBy, null,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
-                    new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+                        buildResultIteratorFactory(context, table, orderBy));
         this.allowPageFilter = allowPageFilter;
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
             int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
@@ -70,12 +71,30 @@ public class ScanPlan extends BasicQueryPlan {
             ScanRegionObserver.serializeIntoScan(context.getScan(), thresholdBytes, limit
== null ? -1 : limit, orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize());
         }
     }
-    
+
+    private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context,
+            TableRef table, OrderBy orderBy) {
+
+        ParallelIteratorFactory spoolingResultIteratorFactory =
+                new SpoolingResultIterator.SpoolingResultIteratorFactory(
+                        context.getConnection().getQueryServices());
+
+        // If we're doing an order by then we need the full result before we can do anything,
+        // so we don't bother chunking it. If we're just doing a simple scan then we chunk
+        // the scan to have a quicker initial response.
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            return spoolingResultIteratorFactory;
+        } else {
+            return new ChunkedResultIterator.ChunkedResultIteratorFactory(
+                    spoolingResultIteratorFactory, table);
+        }
+    }
+
     @Override
     public List<KeyRange> getSplits() {
         return splits;
     }
-    
+
     @Override
     protected ResultIterator newIterator() throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be too late afterwards
@@ -96,9 +115,9 @@ public class ScanPlan extends BasicQueryPlan {
         if (isOrdered) {
             scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());
         } else {
-            if (isSalted && 
+            if (isSalted &&
                     (context.getConnection().getQueryServices().getProps().getBoolean(
-                            QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, 
+                            QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB,
                             QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE) ||
                      orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY ||
                      orderBy == OrderBy.REV_ROW_KEY_ORDER_BY)) { // ORDER BY was optimized
out b/c query is in row key order

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
new file mode 100644
index 0000000..5635114
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.iterate;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * {@code PeekingResultIterator} implementation that loads data in chunks. This is intended
for
+ * basic scan plans, to avoid loading large quantities of data from HBase in one go.
+ */
+public class ChunkedResultIterator implements PeekingResultIterator {
+
+    private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory;
+    private SingleChunkResultIterator singleChunkResultIterator;
+    private final StatementContext context;
+    private final TableRef tableRef;
+    private Scan scan;
+    private final long chunkSize;
+    private PeekingResultIterator resultIterator;
+
+    public static class ChunkedResultIteratorFactory implements ParallelIterators.ParallelIteratorFactory
{
+
+        private final ParallelIterators.ParallelIteratorFactory delegateFactory;
+        private final TableRef tableRef;
+
+        public ChunkedResultIteratorFactory(ParallelIterators.ParallelIteratorFactory
+                delegateFactory, TableRef tableRef) {
+            this.delegateFactory = delegateFactory;
+            this.tableRef = tableRef;
+        }
+
+        @Override
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator
scanner) throws SQLException {
+            // TODO It doesn't seem right to do this selection here, but it's not currently
clear
+            // where a better place is to do it
+            // For a HashJoin the scan can't be restarted where it left off, so we don't
use
+            // a ChunkedResultIterator
+            if (HashJoinInfo.isHashJoin(context.getScan())) {
+                return delegateFactory.newIterator(context, scanner);
+            } else {
+                return new ChunkedResultIterator(delegateFactory, context, tableRef,
+                        context.getConnection().getQueryServices().getProps().getLong(
+                                            QueryServices.SCAN_RESULT_CHUNK_SIZE,
+                                            QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
+            }
+        }
+    }
+
+    public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory,
+            StatementContext context, TableRef tableRef, long chunkSize) {
+        this.delegateIteratorFactory = delegateIteratorFactory;
+        this.context = context;
+        this.tableRef = tableRef;
+        this.scan = context.getScan();
+        this.chunkSize = chunkSize;
+    }
+
+    @Override
+    public Tuple peek() throws SQLException {
+        return getResultIterator().peek();
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        return getResultIterator().next();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        resultIterator.explain(planSteps);
+    }
+
+    @Override
+    public void close() throws SQLException {
+        if (resultIterator != null) {
+            resultIterator.close();
+        }
+        if (singleChunkResultIterator != null) {
+            singleChunkResultIterator.close();
+        }
+    }
+
+    private PeekingResultIterator getResultIterator() throws SQLException {
+        if (resultIterator == null) {
+            singleChunkResultIterator = new SingleChunkResultIterator(
+                    new TableResultIterator(context, tableRef, scan), chunkSize);
+            resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator);
+        } else if (resultIterator.peek() == null && !singleChunkResultIterator.isEndOfStreamReached())
{
+            singleChunkResultIterator.close();
+            try {
+                this.scan = new Scan(scan);
+            } catch (IOException e) {
+                throw new PhoenixIOException(e);
+            }
+            scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), new byte[]{0}));
+            singleChunkResultIterator = new SingleChunkResultIterator(
+                    new TableResultIterator(context, tableRef, scan), chunkSize);
+            resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator);
+        }
+        return resultIterator;
+    }
+
+    /**
+     * ResultIterator that runs over a single chunk of results (i.e. a portion of a scan).
+     */
+    private static class SingleChunkResultIterator implements ResultIterator {
+
+        private int rowCount = 0;
+        private boolean endOfStreamReached;
+        private Tuple lastTuple;
+        private final ResultIterator delegate;
+        private final long chunkSize;
+
+        private SingleChunkResultIterator(ResultIterator delegate, long chunkSize) {
+            this.delegate = delegate;
+            this.chunkSize = chunkSize;
+        }
+
+        @Override
+        public Tuple next() throws SQLException {
+            if (isChunkComplete() || isEndOfStreamReached()) {
+                return null;
+            }
+            Tuple next = delegate.next();
+            if (next != null) {
+                lastTuple = next;
+                rowCount++;
+            } else {
+                endOfStreamReached = true;
+            }
+            return next;
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {
+            delegate.explain(planSteps);
+        }
+
+        @Override
+        public void close() throws SQLException {
+            delegate.close();
+        }
+
+        /**
+         * Returns true if the current chunk has been fully iterated over.
+         */
+        public boolean isChunkComplete() {
+            return rowCount == chunkSize;
+        }
+
+        /**
+         * Returns true if the end of all chunks has been reached.
+         */
+        public boolean isEndOfStreamReached() {
+            return endOfStreamReached;
+        }
+
+        /**
+         * Returns the last-encountered key.
+         */
+        public byte[] getLastKey() {
+            ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
+            lastTuple.getKey(keyPtr);
+            return keyPtr.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 804db96..34373bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.*;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
@@ -247,12 +248,14 @@ public class ParallelIterators extends ExplainTable implements ResultIterators
{
                         @Override
                         public PeekingResultIterator call() throws Exception {
                             // TODO: different HTableInterfaces for each thread or the same
is better?
+
+                            StatementContext scanContext = new StatementContext(context,
splitScan);
                         	long startTime = System.currentTimeMillis();
-                            ResultIterator scanner = new TableResultIterator(context, tableRef,
splitScan);
+                            ResultIterator scanner = new TableResultIterator(scanContext,
tableRef, splitScan);
                             if (logger.isDebugEnabled()) {
                             	logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis()
- startTime) + "ms, Scan: " + splitScan);
                             }
-                            return iteratorFactory.newIterator(context, scanner);
+                            return iteratorFactory.newIterator(scanContext, scanner);
                         }
 
                         /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
index 3cbf58f..ce336b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.util.SchemaUtil;
 
 public class HashJoinInfo {
     private static final String HASH_JOIN = "HashJoin";
-    
+
     private KeyValueSchema joinedSchema;
     private ImmutableBytesPtr[] joinIds;
     private List<Expression>[] joinExpressions;
@@ -47,11 +47,11 @@ public class HashJoinInfo {
     private Expression postJoinFilterExpression;
     private Integer limit;
     private boolean forceProjection;
-    
+
     public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[]
joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions,
Expression postJoinFilterExpression, Integer limit, boolean forceProjection) {
     	this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation,
buildSchemas(tables), fieldPositions, postJoinFilterExpression, limit, forceProjection);
     }
-    
+
     private static KeyValueSchema[] buildSchemas(PTable[] tables) {
     	KeyValueSchema[] schemas = new KeyValueSchema[tables.length];
     	for (int i = 0; i < tables.length; i++) {
@@ -59,7 +59,7 @@ public class HashJoinInfo {
     	}
     	return schemas;
     }
-    
+
     private static KeyValueSchema buildSchema(PTable table) {
     	KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
     	if (table != null) {
@@ -71,7 +71,7 @@ public class HashJoinInfo {
     	}
         return builder.build();
     }
-    
+
     private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[]
joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas,
int[] fieldPositions, Expression postJoinFilterExpression, Integer limit, boolean forceProjection)
{
     	this.joinedSchema = joinedSchema;
     	this.joinIds = joinIds;
@@ -84,43 +84,43 @@ public class HashJoinInfo {
         this.limit = limit;
         this.forceProjection = forceProjection;
     }
-    
+
     public KeyValueSchema getJoinedSchema() {
     	return joinedSchema;
     }
-    
+
     public ImmutableBytesPtr[] getJoinIds() {
         return joinIds;
     }
-    
+
     public List<Expression>[] getJoinExpressions() {
         return joinExpressions;
     }
-    
+
     public JoinType[] getJoinTypes() {
         return joinTypes;
     }
-    
+
     public boolean[] earlyEvaluation() {
     	return earlyEvaluation;
     }
-    
+
     public KeyValueSchema[] getSchemas() {
     	return schemas;
     }
-    
+
     public int[] getFieldPositions() {
     	return fieldPositions;
     }
-    
+
     public Expression getPostJoinFilterExpression() {
         return postJoinFilterExpression;
     }
-    
+
     public Integer getLimit() {
         return limit;
     }
-    
+
     /*
      * If the LHS table is a sub-select, we always do projection, since
      * the ON expressions reference only projected columns.
@@ -128,7 +128,7 @@ public class HashJoinInfo {
     public boolean forceProjection() {
         return forceProjection;
     }
-    
+
     public static void serializeHashJoinIntoScan(Scan scan, HashJoinInfo joinInfo) {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         try {
@@ -166,9 +166,9 @@ public class HashJoinInfo {
                 throw new RuntimeException(e);
             }
         }
-        
+
     }
-    
+
     @SuppressWarnings("unchecked")
     public static HashJoinInfo deserializeHashJoinFromScan(Scan scan) {
         byte[] join = scan.getAttribute(HASH_JOIN);
@@ -196,7 +196,7 @@ public class HashJoinInfo {
                     int expressionOrdinal = WritableUtils.readVInt(input);
                     Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
                     expression.readFields(input);
-                    joinExpressions[i].add(expression);                    
+                    joinExpressions[i].add(expression);
                 }
                 int type = WritableUtils.readVInt(input);
                 joinTypes[i] = JoinType.values()[type];
@@ -225,4 +225,14 @@ public class HashJoinInfo {
         }
     }
 
+    /**
+     * Check if a scan is intended for completing a HashJoin.
+     *
+     * @param scan the scan to be checked
+     * @return {@code true} if the scan is to be used for a HashJoin, otherwise {@code false}
+     */
+    public static boolean isHashJoin(Scan scan) {
+        return scan.getAttribute(HASH_JOIN) != null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 9c751ba..639eaa5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -59,6 +59,11 @@ public interface QueryServices extends SQLCloseable {
 	 */
 	public static final String MAX_SPOOL_TO_DISK_BYTES_ATTRIB = "phoenix.query.maxSpoolToDiskBytes";
     
+    /**
+     * Number of records to read per chunk when streaming records of a basic scan.
+     */
+    public static final String SCAN_RESULT_CHUNK_SIZE = "phoenix.query.scanResultChunkSize";
+
     public static final String MAX_MEMORY_PERC_ATTRIB = "phoenix.query.maxGlobalMemoryPercentage";
     public static final String MAX_MEMORY_WAIT_MS_ATTRIB = "phoenix.query.maxGlobalMemoryWaitMs";
     public static final String MAX_TENANT_MEMORY_PERC_ATTRIB = "phoenix.query.maxTenantMemoryPercentage";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index ca14a9e..108dad4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -46,6 +46,7 @@ import static org.apache.phoenix.query.QueryServices.REGIONSERVER_LEASE_PERIOD_A
 import static org.apache.phoenix.query.QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE;
 import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
@@ -99,6 +100,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1;
// 1 Mb
     public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
     public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
+    public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 1000L;
     
     // 
     // Spillable GroupBy - SPGBY prefix
@@ -172,6 +174,7 @@ public class QueryServicesOptions {
             .setIfUnset(GROUPBY_MAX_CACHE_SIZE_ATTRIB, DEFAULT_GROUPBY_MAX_CACHE_MAX)
             .setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES)
             .setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE)
+            .setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE)
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set


Mime
View raw message