Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8096E11E7D for ; Tue, 8 Jul 2014 19:28:20 +0000 (UTC) Received: (qmail 15003 invoked by uid 500); 8 Jul 2014 19:28:20 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 14963 invoked by uid 500); 8 Jul 2014 19:28:20 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 14954 invoked by uid 99); 8 Jul 2014 19:28:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jul 2014 19:28:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DF514946F66; Tue, 8 Jul 2014 19:28:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greid@apache.org To: commits@phoenix.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: PHOENIX-539 Chunked loading of parallel scanning Date: Tue, 8 Jul 2014 19:28:19 +0000 (UTC) Repository: phoenix Updated Branches: refs/heads/3.0 c46364ed1 -> ec1b76ab2 PHOENIX-539 Chunked loading of parallel scanning Instead of spooling all data in a table to disk, load it in chunks as needed, potentially spooling large quantities of data to disk. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ec1b76ab Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ec1b76ab Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ec1b76ab Branch: refs/heads/3.0 Commit: ec1b76ab28880d548c8061a6dcb80a6fe0d97ed6 Parents: c46364e Author: Gabriel Reid Authored: Tue Jun 10 07:36:37 2014 +0200 Committer: Gabriel Reid Committed: Tue Jul 8 20:37:30 2014 +0200 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/QueryIT.java | 11 +- .../phoenix/compile/StatementContext.java | 23 +++ .../org/apache/phoenix/execute/ScanPlan.java | 43 ++-- .../phoenix/iterate/ChunkedResultIterator.java | 196 +++++++++++++++++++ .../phoenix/iterate/ParallelIterators.java | 7 +- .../org/apache/phoenix/join/HashJoinInfo.java | 48 +++-- .../org/apache/phoenix/query/QueryServices.java | 5 + .../phoenix/query/QueryServicesOptions.java | 3 + 8 files changed, 298 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java index 867eb2a..f453853 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java @@ -861,7 +861,7 @@ public class QueryIT extends BaseClientManagedTimeIT { try { PreparedStatement statement = conn.prepareStatement(query); statement.setString(1, tenantId); - statement.setString(2,ROW4); + statement.setString(2, ROW4); ResultSet rs = statement.executeQuery(); assertTrue(rs.next()); assertEquals(A_VALUE, rs.getString(1)); @@ -879,17 +879,17 @@ public class QueryIT extends BaseClientManagedTimeIT { byte[] tableName = Bytes.toBytes(ATABLE_NAME); admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - HTable htable = (HTable)conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName); + HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName); htable.clearRegionCache(); int nRegions = htable.getRegionLocations().size(); - admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A" + Character.valueOf((char)('3' + nextRunCount()))+ ts))); // vary split point with test run + admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A" + Character.valueOf((char) ('3' + nextRunCount())) + ts))); // vary split point with test run int retryCount = 0; do { Thread.sleep(2000); retryCount++; //htable.clearRegionCache(); } while (retryCount < 10 && htable.getRegionLocations().size() == nRegions); - assertNotEquals(nRegions,htable.getRegionLocations().size()); + assertNotEquals(nRegions, htable.getRegionLocations().size()); statement.setString(1, tenantId); rs = statement.executeQuery(); @@ -906,9 +906,10 @@ public class QueryIT extends BaseClientManagedTimeIT { assertEquals(E_VALUE, rs.getString(2)); assertEquals(1, rs.getLong(3)); assertFalse(rs.next()); - } finally { + if (admin != null) { admin.close(); + } conn.close(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 4c907d6..06d5f89 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -91,6 +91,29 @@ public class StatementContext { this.whereConditionColumns = new ArrayList>(); } + /** + * Copy constructor where an altered scan can be set. + * + * @param stmtContext the {@code StatementContext} to be copied + * @param scan the customized scan + */ + public StatementContext(StatementContext stmtContext, Scan scan) { + this.statement = stmtContext.statement; + this.resolver = stmtContext.resolver; + this.scan = scan; + this.sequences = stmtContext.sequences; + this.binds = stmtContext.binds; + this.aggregates = stmtContext.aggregates; + this.expressions = stmtContext.expressions; + this.dateFormat = stmtContext.dateFormat; + this.dateFormatter = stmtContext.dateFormatter; + this.dateParser = stmtContext.dateParser; + this.numberFormat = stmtContext.numberFormat; + this.tempPtr = new ImmutableBytesWritable(); + this.currentTable = stmtContext.currentTable; + this.whereConditionColumns = stmtContext.whereConditionColumns; + } + public String getDateFormat() { return dateFormat; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index fb11b47..a994067 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -18,15 +18,13 @@ package org.apache.phoenix.execute; -import java.sql.SQLException; -import java.util.List; - import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.ScanRegionObserver; +import org.apache.phoenix.iterate.ChunkedResultIterator; import org.apache.phoenix.iterate.ConcatResultIterator; import org.apache.phoenix.iterate.LimitingResultIterator; import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator; @@ -46,23 +44,26 @@ import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ScanUtil; +import java.sql.SQLException; +import java.util.List; + /** - * + * * Query plan for a basic table scan * - * + * * @since 0.1 */ public class ScanPlan extends BasicQueryPlan { private List splits; private boolean allowPageFilter; - + public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) { - super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, null, + super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, null, parallelIteratorFactory != null ? parallelIteratorFactory : - new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices())); + buildResultIteratorFactory(context, table, orderBy)); this.allowPageFilter = allowPageFilter; if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( @@ -70,12 +71,30 @@ public class ScanPlan extends BasicQueryPlan { ScanRegionObserver.serializeIntoScan(context.getScan(), thresholdBytes, limit == null ? -1 : limit, orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize()); } } - + + private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, + TableRef table, OrderBy orderBy) { + + ParallelIteratorFactory spoolingResultIteratorFactory = + new SpoolingResultIterator.SpoolingResultIteratorFactory( + context.getConnection().getQueryServices()); + + // If we're doing an order by then we need the full result before we can do anything, + // so we don't bother chunking it. If we're just doing a simple scan then we chunk + // the scan to have a quicker initial response. + if (!orderBy.getOrderByExpressions().isEmpty()) { + return spoolingResultIteratorFactory; + } else { + return new ChunkedResultIterator.ChunkedResultIteratorFactory( + spoolingResultIteratorFactory, table); + } + } + @Override public List getSplits() { return splits; } - + @Override protected ResultIterator newIterator() throws SQLException { // Set any scan attributes before creating the scanner, as it will be too late afterwards @@ -96,9 +115,9 @@ public class ScanPlan extends BasicQueryPlan { if (isOrdered) { scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions()); } else { - if (isSalted && + if (isSalted && (context.getConnection().getQueryServices().getProps().getBoolean( - QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, + QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE) || orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY)) { // ORDER BY was optimized out b/c query is in row key order http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java new file mode 100644 index 0000000..5635114 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.iterate; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.exception.PhoenixIOException; +import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.Tuple; + +/** + * {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for + * basic scan plans, to avoid loading large quantities of data from HBase in one go. + */ +public class ChunkedResultIterator implements PeekingResultIterator { + + private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory; + private SingleChunkResultIterator singleChunkResultIterator; + private final StatementContext context; + private final TableRef tableRef; + private Scan scan; + private final long chunkSize; + private PeekingResultIterator resultIterator; + + public static class ChunkedResultIteratorFactory implements ParallelIterators.ParallelIteratorFactory { + + private final ParallelIterators.ParallelIteratorFactory delegateFactory; + private final TableRef tableRef; + + public ChunkedResultIteratorFactory(ParallelIterators.ParallelIteratorFactory + delegateFactory, TableRef tableRef) { + this.delegateFactory = delegateFactory; + this.tableRef = tableRef; + } + + @Override + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner) throws SQLException { + // TODO It doesn't seem right to do this selection here, but it's not currently clear + // where a better place is to do it + // For a HashJoin the scan can't be restarted where it left off, so we don't use + // a ChunkedResultIterator + if (HashJoinInfo.isHashJoin(context.getScan())) { + return delegateFactory.newIterator(context, scanner); + } else { + return new ChunkedResultIterator(delegateFactory, context, tableRef, + context.getConnection().getQueryServices().getProps().getLong( + QueryServices.SCAN_RESULT_CHUNK_SIZE, + QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE)); + } + } + } + + public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory, + StatementContext context, TableRef tableRef, long chunkSize) { + this.delegateIteratorFactory = delegateIteratorFactory; + this.context = context; + this.tableRef = tableRef; + this.scan = context.getScan(); + this.chunkSize = chunkSize; + } + + @Override + public Tuple peek() throws SQLException { + return getResultIterator().peek(); + } + + @Override + public Tuple next() throws SQLException { + return getResultIterator().next(); + } + + @Override + public void explain(List planSteps) { + resultIterator.explain(planSteps); + } + + @Override + public void close() throws SQLException { + if (resultIterator != null) { + resultIterator.close(); + } + if (singleChunkResultIterator != null) { + singleChunkResultIterator.close(); + } + } + + private PeekingResultIterator getResultIterator() throws SQLException { + if (resultIterator == null) { + singleChunkResultIterator = new SingleChunkResultIterator( + new TableResultIterator(context, tableRef, scan), chunkSize); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator); + } else if (resultIterator.peek() == null && !singleChunkResultIterator.isEndOfStreamReached()) { + singleChunkResultIterator.close(); + try { + this.scan = new Scan(scan); + } catch (IOException e) { + throw new PhoenixIOException(e); + } + scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), new byte[]{0})); + singleChunkResultIterator = new SingleChunkResultIterator( + new TableResultIterator(context, tableRef, scan), chunkSize); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator); + } + return resultIterator; + } + + /** + * ResultIterator that runs over a single chunk of results (i.e. a portion of a scan). + */ + private static class SingleChunkResultIterator implements ResultIterator { + + private int rowCount = 0; + private boolean endOfStreamReached; + private Tuple lastTuple; + private final ResultIterator delegate; + private final long chunkSize; + + private SingleChunkResultIterator(ResultIterator delegate, long chunkSize) { + this.delegate = delegate; + this.chunkSize = chunkSize; + } + + @Override + public Tuple next() throws SQLException { + if (isChunkComplete() || isEndOfStreamReached()) { + return null; + } + Tuple next = delegate.next(); + if (next != null) { + lastTuple = next; + rowCount++; + } else { + endOfStreamReached = true; + } + return next; + } + + @Override + public void explain(List planSteps) { + delegate.explain(planSteps); + } + + @Override + public void close() throws SQLException { + delegate.close(); + } + + /** + * Returns true if the current chunk has been fully iterated over. + */ + public boolean isChunkComplete() { + return rowCount == chunkSize; + } + + /** + * Returns true if the end of all chunks has been reached. + */ + public boolean isEndOfStreamReached() { + return endOfStreamReached; + } + + /** + * Returns the last-encountered key. + */ + public byte[] getLastKey() { + ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(); + lastTuple.getKey(keyPtr); + return keyPtr.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 804db96..34373bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.*; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.job.JobManager.JobCallable; @@ -247,12 +248,14 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { @Override public PeekingResultIterator call() throws Exception { // TODO: different HTableInterfaces for each thread or the same is better? + + StatementContext scanContext = new StatementContext(context, splitScan); long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan); + ResultIterator scanner = new TableResultIterator(scanContext, tableRef, splitScan); if (logger.isDebugEnabled()) { logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan); } - return iteratorFactory.newIterator(context, scanner); + return iteratorFactory.newIterator(scanContext, scanner); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java index 3cbf58f..ce336b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java @@ -36,7 +36,7 @@ import org.apache.phoenix.util.SchemaUtil; public class HashJoinInfo { private static final String HASH_JOIN = "HashJoin"; - + private KeyValueSchema joinedSchema; private ImmutableBytesPtr[] joinIds; private List[] joinExpressions; @@ -47,11 +47,11 @@ public class HashJoinInfo { private Expression postJoinFilterExpression; private Integer limit; private boolean forceProjection; - + public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions, Expression postJoinFilterExpression, Integer limit, boolean forceProjection) { this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation, buildSchemas(tables), fieldPositions, postJoinFilterExpression, limit, forceProjection); } - + private static KeyValueSchema[] buildSchemas(PTable[] tables) { KeyValueSchema[] schemas = new KeyValueSchema[tables.length]; for (int i = 0; i < tables.length; i++) { @@ -59,7 +59,7 @@ public class HashJoinInfo { } return schemas; } - + private static KeyValueSchema buildSchema(PTable table) { KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); if (table != null) { @@ -71,7 +71,7 @@ public class HashJoinInfo { } return builder.build(); } - + private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas, int[] fieldPositions, Expression postJoinFilterExpression, Integer limit, boolean forceProjection) { this.joinedSchema = joinedSchema; this.joinIds = joinIds; @@ -84,43 +84,43 @@ public class HashJoinInfo { this.limit = limit; this.forceProjection = forceProjection; } - + public KeyValueSchema getJoinedSchema() { return joinedSchema; } - + public ImmutableBytesPtr[] getJoinIds() { return joinIds; } - + public List[] getJoinExpressions() { return joinExpressions; } - + public JoinType[] getJoinTypes() { return joinTypes; } - + public boolean[] earlyEvaluation() { return earlyEvaluation; } - + public KeyValueSchema[] getSchemas() { return schemas; } - + public int[] getFieldPositions() { return fieldPositions; } - + public Expression getPostJoinFilterExpression() { return postJoinFilterExpression; } - + public Integer getLimit() { return limit; } - + /* * If the LHS table is a sub-select, we always do projection, since * the ON expressions reference only projected columns. @@ -128,7 +128,7 @@ public class HashJoinInfo { public boolean forceProjection() { return forceProjection; } - + public static void serializeHashJoinIntoScan(Scan scan, HashJoinInfo joinInfo) { ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { @@ -166,9 +166,9 @@ public class HashJoinInfo { throw new RuntimeException(e); } } - + } - + @SuppressWarnings("unchecked") public static HashJoinInfo deserializeHashJoinFromScan(Scan scan) { byte[] join = scan.getAttribute(HASH_JOIN); @@ -196,7 +196,7 @@ public class HashJoinInfo { int expressionOrdinal = WritableUtils.readVInt(input); Expression expression = ExpressionType.values()[expressionOrdinal].newInstance(); expression.readFields(input); - joinExpressions[i].add(expression); + joinExpressions[i].add(expression); } int type = WritableUtils.readVInt(input); joinTypes[i] = JoinType.values()[type]; @@ -225,4 +225,14 @@ public class HashJoinInfo { } } + /** + * Check if a scan is intended for completing a HashJoin. + * + * @param scan the scan to be checked + * @return {@code true} if the scan is to be used for a HashJoin, otherwise {@code false} + */ + public static boolean isHashJoin(Scan scan) { + return scan.getAttribute(HASH_JOIN) != null; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 9c751ba..639eaa5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -59,6 +59,11 @@ public interface QueryServices extends SQLCloseable { */ public static final String MAX_SPOOL_TO_DISK_BYTES_ATTRIB = "phoenix.query.maxSpoolToDiskBytes"; + /** + * Number of records to read per chunk when streaming records of a basic scan. + */ + public static final String SCAN_RESULT_CHUNK_SIZE = "phoenix.query.scanResultChunkSize"; + public static final String MAX_MEMORY_PERC_ATTRIB = "phoenix.query.maxGlobalMemoryPercentage"; public static final String MAX_MEMORY_WAIT_MS_ATTRIB = "phoenix.query.maxGlobalMemoryWaitMs"; public static final String MAX_TENANT_MEMORY_PERC_ATTRIB = "phoenix.query.maxTenantMemoryPercentage"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec1b76ab/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index ca14a9e..108dad4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -46,6 +46,7 @@ import static org.apache.phoenix.query.QueryServices.REGIONSERVER_LEASE_PERIOD_A import static org.apache.phoenix.query.QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB; import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB; +import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE; import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB; @@ -99,6 +100,7 @@ public class QueryServicesOptions { public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5; public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000; + public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 1000L; // // Spillable GroupBy - SPGBY prefix @@ -172,6 +174,7 @@ public class QueryServicesOptions { .setIfUnset(GROUPBY_MAX_CACHE_SIZE_ATTRIB, DEFAULT_GROUPBY_MAX_CACHE_MAX) .setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES) .setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE) + .setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE) ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set