From commits-return-20927-archive-asf-public=cust-asf.ponee.io@phoenix.apache.org Tue May 1 20:48:03 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B4024180675 for ; Tue, 1 May 2018 20:48:02 +0200 (CEST) Received: (qmail 13504 invoked by uid 500); 1 May 2018 18:48:01 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 13491 invoked by uid 99); 1 May 2018 18:48:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 May 2018 18:48:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6540BE78E2; Tue, 1 May 2018 18:48:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jamestaylor@apache.org To: commits@phoenix.apache.org Date: Tue, 01 May 2018 18:48:02 -0000 Message-Id: <64b483c632394570a0e5435f3dc5fbf5@git.apache.org> In-Reply-To: <22dd8b16035549ac9e5f1d360a4ed0cf@git.apache.org> References: <22dd8b16035549ac9e5f1d360a4ed0cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] phoenix git commit: PHOENIX-4718 Decrease overhead of tracking aggregate heap size PHOENIX-4718 Decrease overhead of tracking aggregate heap size Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/08ce089c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/08ce089c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/08ce089c Branch: refs/heads/5.x-HBase-2.0 Commit: 08ce089c3f14d531dc2c78c8b1788b4f93508ce0 Parents: 4e2901c Author: James Taylor Authored: Mon Apr 30 22:03:38 2018 -0700 Committer: James Taylor Committed: Tue May 1 11:46:13 2018 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/SpillableGroupByIT.java | 17 ++-- .../GroupedAggregateRegionObserver.java | 95 ++++++++++---------- .../UngroupedAggregateRegionObserver.java | 48 +++++----- .../phoenix/execute/ClientAggregatePlan.java | 2 +- .../expression/aggregator/Aggregator.java | 9 +- .../expression/aggregator/Aggregators.java | 3 +- .../expression/aggregator/BaseAggregator.java | 4 + .../aggregator/ClientAggregators.java | 3 +- .../DistinctValueWithCountServerAggregator.java | 15 ++-- .../NonSizeTrackingServerAggregators.java | 42 +++++++++ .../aggregator/ServerAggregators.java | 42 +++++---- .../SizeTrackingServerAggregators.java | 59 ++++++++++++ .../org/apache/phoenix/query/QueryServices.java | 1 + .../phoenix/query/QueryServicesOptions.java | 1 + .../phoenix/compile/QueryCompilerTest.java | 4 +- .../phoenix/query/QueryServicesTestImpl.java | 2 + 16 files changed, 231 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java index 3689c4c..21b2ac9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java @@ -53,9 +53,9 @@ public class SpillableGroupByIT extends BaseOwnClusterIT { private static final int NUM_ROWS_INSERTED = 1000; - // covers: COUNT, COUNT(DISTINCT) SUM, AVG, MIN, MAX + // covers: COUNT, SUM, AVG, MIN, MAX private static String GROUPBY1 = "select " - + "count(*), count(distinct uri), sum(appcpu), avg(appcpu), uri, min(id), max(id) from %s " + + "count(*), sum(appcpu), avg(appcpu), uri, min(id), max(id) from %s " + "group by uri"; private static String GROUPBY2 = "select count(distinct uri) from %s"; @@ -135,13 +135,12 @@ public class SpillableGroupByIT extends BaseOwnClusterIT { int count = 0; while (rs.next()) { - String uri = rs.getString(5); + String uri = rs.getString(4); assertEquals(2, rs.getInt(1)); - assertEquals(1, rs.getInt(2)); - assertEquals(20, rs.getInt(3)); - assertEquals(10, rs.getInt(4)); - int a = Integer.valueOf(rs.getString(6)).intValue(); - int b = Integer.valueOf(rs.getString(7)).intValue(); + assertEquals(20, rs.getInt(2)); + assertEquals(10, rs.getInt(3)); + int a = Integer.valueOf(rs.getString(5)).intValue(); + int b = Integer.valueOf(rs.getString(6)).intValue(); assertEquals(Integer.valueOf(uri).intValue(), Math.min(a, b)); assertEquals(NUM_ROWS_INSERTED / 2 + Integer.valueOf(uri), Math.max(a, b)); count++; @@ -206,4 +205,4 @@ public class SpillableGroupByIT extends BaseOwnClusterIT { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 5d89e8e..8c878cb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -132,53 +132,56 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im } List expressions = deserializeGroupByExpressions(expressionBytes, 0); - ServerAggregators aggregators = - ServerAggregators.deserialize(scan - .getAttribute(BaseScannerRegionObserver.AGGREGATORS), c - .getEnvironment().getConfiguration()); - - RegionScanner innerScanner = s; - boolean useProto = false; - byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO); - useProto = localIndexBytes != null; - if (localIndexBytes == null) { - localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); - } - List indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); - TupleProjector tupleProjector = null; - byte[][] viewConstants = null; - ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); - - final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); - final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); - boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); - if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) { - if (dataColumns != null) { - tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); - viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); + final TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), ScanUtil.getTenantId(scan)); + try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) { + ServerAggregators aggregators = + ServerAggregators.deserialize(scan + .getAttribute(BaseScannerRegionObserver.AGGREGATORS), c + .getEnvironment().getConfiguration(), em); + + RegionScanner innerScanner = s; + boolean useProto = false; + byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO); + useProto = localIndexBytes != null; + if (localIndexBytes == null) { + localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); + } + List indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); + TupleProjector tupleProjector = null; + byte[][] viewConstants = null; + ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); + + final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); + final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); + if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) { + if (dataColumns != null) { + tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); + viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); + } + ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); + innerScanner = + getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, + c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); + } + + if (j != null) { + innerScanner = + new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan), + c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier); + } + + long limit = Long.MAX_VALUE; + byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT); + if (limitBytes != null) { + limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault()); + } + if (keyOrdered) { // Optimize by taking advantage that the rows are + // already in the required group by key order + return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit); + } else { // Otherwse, collect them all up in an in memory map + return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit); } - ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); - innerScanner = - getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, - c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); - } - - if (j != null) { - innerScanner = - new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan), - c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier); - } - - long limit = Long.MAX_VALUE; - byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT); - if (limitBytes != null) { - limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault()); - } - if (keyOrdered) { // Optimize by taking advantage that the rows are - // already in the required group by key order - return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit); - } else { // Otherwse, collect them all up in an in memory map - return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index b808530..5743523 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -103,7 +103,6 @@ import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; @@ -534,31 +533,33 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); } - Aggregators aggregators = ServerAggregators.deserialize( - scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), conf); - Aggregator[] rowAggregators = aggregators.getAggregators(); boolean hasMore; - boolean hasAny = false; - Pair minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); - Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); - } int rowCount = 0; - final RegionScanner innerScanner = theScanner; - boolean useIndexProto = true; - byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); - // for backward compatiblity fall back to look by the old attribute - if (indexMaintainersPtr == null) { - indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); - useIndexProto = false; - } - - byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + boolean hasAny = false; boolean acquiredLock = false; boolean incrScanRefCount = false; + Aggregators aggregators = null; + Aggregator[] rowAggregators = null; + final RegionScanner innerScanner = theScanner; final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan)); try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) { + aggregators = ServerAggregators.deserialize( + scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), conf, em); + rowAggregators = aggregators.getAggregators(); + Pair minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); + } + boolean useIndexProto = true; + byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); + // for backward compatiblity fall back to look by the old attribute + if (indexMaintainersPtr == null) { + indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); + useIndexProto = false; + } + + byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); if(needToWrite) { synchronized (lock) { if (isRegionClosingOrSplitting) { @@ -571,7 +572,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } region.startRegionOperation(); acquiredLock = true; - long size = 0; synchronized (innerScanner) { do { List results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList(); @@ -803,11 +803,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver commitBatch(region, indexMutations, blockingMemStoreSize); indexMutations.clear(); } - size += aggregators.aggregate(rowAggregators, result); - while(size > em.getSize()) { - logger.info("Request: {}, resizing {} by 1024*1024", size, em.getSize()); - em.resize(em.getSize() + 1024*1024); - } + aggregators.aggregate(rowAggregators, result); hasAny = true; } } while (hasMore); http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java index 22570e4..2db441a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java @@ -88,7 +88,7 @@ public class ClientAggregatePlan extends ClientProcessingPlan { // aggregators. We use the Configuration directly here to avoid the expense of creating // another one. this.serverAggregators = ServerAggregators.deserialize(context.getScan() - .getAttribute(BaseScannerRegionObserver.AGGREGATORS), context.getConnection().getQueryServices().getConfiguration()); + .getAttribute(BaseScannerRegionObserver.AGGREGATORS), context.getConnection().getQueryServices().getConfiguration(), null); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java index e436deb..6e57025 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java @@ -18,7 +18,6 @@ package org.apache.phoenix.expression.aggregator; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.phoenix.expression.Expression; import org.apache.phoenix.schema.tuple.Tuple; @@ -43,4 +42,12 @@ public interface Aggregator extends Expression { * Get the size in bytes */ public int getSize(); + + /** + * Determines whether or not we should track the heap size as + * this aggregator is executing on the server side. + * @return true if the size should be tracked and false + * otherwise. + */ + public boolean trackSize(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java index 2f6e6ee..b1dc658 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java @@ -19,7 +19,6 @@ package org.apache.phoenix.expression.aggregator; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.expression.function.SingleAggregateFunction; -import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.ValueBitSet; @@ -83,7 +82,7 @@ abstract public class Aggregators { * Aggregate over aggregators * @param result the single row Result from scan iteration */ - abstract public long aggregate(Aggregator[] aggregators, Tuple result); + abstract public void aggregate(Aggregator[] aggregators, Tuple result); protected static int calculateSize(Aggregator[] aggregators) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java index 9b673bf..16ce588 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java @@ -63,4 +63,8 @@ public abstract class BaseAggregator extends BaseTerminalExpression implements A return null; } + @Override + public boolean trackSize() { + return false; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java index f1ed2a9..54d5690 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java @@ -50,7 +50,7 @@ public class ClientAggregators extends Aggregators { } @Override - public long aggregate(Aggregator[] aggregators, Tuple result) { + public void aggregate(Aggregator[] aggregators, Tuple result) { TupleUtil.getAggregateValue(result, ptr); tempValueSet.clear(); tempValueSet.or(ptr); @@ -64,7 +64,6 @@ public class ClientAggregators extends Aggregators { } i++; } - return 0; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java index ea7474f..af64900 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java @@ -24,14 +24,13 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.SizedUtil; import org.iq80.snappy.Snappy; @@ -157,11 +156,11 @@ public class DistinctValueWithCountServerAggregator extends BaseAggregator { @Override public int getSize() { - // TODO make this size correct.?? - // This size is being called initially at the begin of the scanner open. At that time we any - // way can not tell the exact size of the Map. The Aggregators get size from all Aggregator - // and stores in a variable for future use. This size of the Aggregators is being used in - // Grouped unordered scan. Do we need some changes there in that calculation? return super.getSize() + SizedUtil.ARRAY_SIZE + countMapHeapSize(); } + + @Override + public boolean trackSize() { + return true; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NonSizeTrackingServerAggregators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NonSizeTrackingServerAggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NonSizeTrackingServerAggregators.java new file mode 100644 index 0000000..8836c45 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NonSizeTrackingServerAggregators.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression.aggregator; + +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.function.SingleAggregateFunction; +import org.apache.phoenix.schema.tuple.Tuple; + +public class NonSizeTrackingServerAggregators extends ServerAggregators { + public static final ServerAggregators EMPTY_AGGREGATORS = new NonSizeTrackingServerAggregators(new SingleAggregateFunction[0], new Aggregator[0], new Expression[0], 0); + + public NonSizeTrackingServerAggregators(SingleAggregateFunction[] functions, Aggregator[] aggregators, + Expression[] expressions, int minNullableIndex) { + super(functions, aggregators, expressions, minNullableIndex); + } + + @Override + public void aggregate(Aggregator[] aggregators, Tuple result) { + for (int i = 0; i < expressions.length; i++) { + if (expressions[i].evaluate(result, ptr) && ptr.getLength() != 0) { + aggregators[i].aggregate(result, ptr); + } + expressions[i].reset(); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java index 790939c..ef9ca0f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java @@ -30,6 +30,9 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.function.SingleAggregateFunction; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.tuple.Tuple; @@ -38,11 +41,10 @@ import org.apache.phoenix.schema.tuple.Tuple; * Aggregators that execute on the server-side * */ -public class ServerAggregators extends Aggregators { - public static final ServerAggregators EMPTY_AGGREGATORS = new ServerAggregators(new SingleAggregateFunction[0], new Aggregator[0], new Expression[0], 0); - private final Expression[] expressions; +public abstract class ServerAggregators extends Aggregators { + protected final Expression[] expressions; - private ServerAggregators(SingleAggregateFunction[] functions, Aggregator[] aggregators, Expression[] expressions, int minNullableIndex) { + protected ServerAggregators(SingleAggregateFunction[] functions, Aggregator[] aggregators, Expression[] expressions, int minNullableIndex) { super(functions, aggregators, minNullableIndex); if (aggregators.length != expressions.length) { throw new IllegalArgumentException("Number of aggregators (" + aggregators.length @@ -52,18 +54,7 @@ public class ServerAggregators extends Aggregators { } @Override - public long aggregate(Aggregator[] aggregators, Tuple result) { - long dsize = 0; - for (int i = 0; i < expressions.length; i++) { - if (expressions[i].evaluate(result, ptr) && ptr.getLength() != 0) { - dsize -= aggregators[i].getSize(); - aggregators[i].aggregate(result, ptr); - dsize += aggregators[i].getSize(); - } - expressions[i].reset(); - } - return dsize; - } + public abstract void aggregate(Aggregator[] aggregators, Tuple result); /** * Serialize an Aggregator into a byte array @@ -112,9 +103,9 @@ public class ServerAggregators extends Aggregators { * @param conf Server side configuration used by HBase * @return newly instantiated Aggregators instance */ - public static ServerAggregators deserialize(byte[] b, Configuration conf) { + public static ServerAggregators deserialize(byte[] b, Configuration conf, MemoryChunk chunk) { if (b == null) { - return ServerAggregators.EMPTY_AGGREGATORS; + return NonSizeTrackingServerAggregators.EMPTY_AGGREGATORS; } ByteArrayInputStream stream = new ByteArrayInputStream(b); try { @@ -131,7 +122,20 @@ public class ServerAggregators extends Aggregators { aggregators[i] = aggFunc.getAggregator(); expressions[i] = aggFunc.getAggregatorExpression(); } - return new ServerAggregators(functions, aggregators,expressions, minNullableIndex); + boolean trackSize = false; + if (chunk != null) { + for (Aggregator aggregator : aggregators) { + if (aggregator.trackSize()) { + trackSize = true; + break; + } + } + } + return trackSize ? + new SizeTrackingServerAggregators(functions, aggregators,expressions, minNullableIndex, chunk, + conf.getInt(QueryServices.AGGREGATE_CHUNK_SIZE_INCREASE_ATTRIB, + QueryServicesOptions.DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE)) : + new NonSizeTrackingServerAggregators(functions, aggregators,expressions, minNullableIndex); } catch (IOException e) { throw new RuntimeException(e); } finally { http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/SizeTrackingServerAggregators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/SizeTrackingServerAggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/SizeTrackingServerAggregators.java new file mode 100644 index 0000000..983968b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/SizeTrackingServerAggregators.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression.aggregator; + +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.function.SingleAggregateFunction; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.schema.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SizeTrackingServerAggregators extends ServerAggregators { + private static final Logger logger = LoggerFactory.getLogger(SizeTrackingServerAggregators.class); + + private final MemoryChunk chunk; + private final int sizeIncrease; + private long memoryUsed = 0; + + public SizeTrackingServerAggregators(SingleAggregateFunction[] functions, Aggregator[] aggregators, + Expression[] expressions, int minNullableIndex, MemoryChunk chunk, int sizeIncrease) { + super(functions, aggregators, expressions, minNullableIndex); + this.chunk = chunk; + this.sizeIncrease = sizeIncrease; + } + + @Override + public void aggregate(Aggregator[] aggregators, Tuple result) { + long dsize = memoryUsed; + for (int i = 0; i < expressions.length; i++) { + if (expressions[i].evaluate(result, ptr) && ptr.getLength() != 0) { + dsize -= aggregators[i].getSize(); + aggregators[i].aggregate(result, ptr); + dsize += aggregators[i].getSize(); + } + expressions[i].reset(); + } + while(dsize > chunk.getSize()) { + logger.info("Request: {}, resizing {} by 1024*1024", dsize, chunk.getSize()); + chunk.resize(chunk.getSize() + sizeIncrease); + } + memoryUsed = dsize; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 8cc156c..db0b10b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -103,6 +103,7 @@ public interface QueryServices extends SQLCloseable { public static final String GROUPBY_SPILL_FILES_ATTRIB = "phoenix.groupby.spillFiles"; public static final String GROUPBY_MAX_CACHE_SIZE_ATTRIB = "phoenix.groupby.maxCacheSize"; public static final String GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB = "phoenix.groupby.estimatedDistinctValues"; + public static final String AGGREGATE_CHUNK_SIZE_INCREASE_ATTRIB = "phoenix.aggregate.chunk_size_increase"; public static final String CALL_QUEUE_PRODUCER_ATTRIB_NAME = "CALL_QUEUE_PRODUCER"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 0d846ed..075549f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -161,6 +161,7 @@ public class QueryServicesOptions { public static final int DEFAULT_SCAN_CACHE_SIZE = 1000; public static final int DEFAULT_MAX_INTRA_REGION_PARALLELIZATION = DEFAULT_MAX_QUERY_CONCURRENCY; public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb + public static final int DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE = 1024 * 1024 * 1; // 1 Mb public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 3; public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000; // Only the first chunked batches are fetched in parallel, so this default http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 912682e..243860f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -764,7 +764,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { for (int i = 0; i < queries.length; i++) { query = queries[i]; Scan scan = compileQuery(query, binds); - ServerAggregators aggregators = ServerAggregators.deserialize(scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), null); + ServerAggregators aggregators = ServerAggregators.deserialize(scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), null, null); Aggregator aggregator = aggregators.getAggregators()[0]; assertTrue(aggregator instanceof CountAggregator); } @@ -2431,7 +2431,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { Scan scan = projectQuery("select A.i1 from X group by i1 order by avg(B.i2) " + "desc"); ServerAggregators aggregators = ServerAggregators.deserialize(scan.getAttribute - (BaseScannerRegionObserver.AGGREGATORS), null); + (BaseScannerRegionObserver.AGGREGATORS), null, null); assertEquals(2,aggregators.getAggregatorCount()); } finally { conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/08ce089c/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index a7569f7..e279074 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -67,6 +67,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { public static final long DEFAULT_INDEX_POPULATION_WAIT_TIME = 0; public static final long DEFAULT_SEQUENCE_CACHE_SIZE = 3; public static final boolean DEFAULT_TRANSACTIONS_ENABLED = true; + public static final int DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE = 1000; /* * Effectively disable running the index rebuild task by having an infinite delay * because we want to control it's execution ourselves @@ -122,6 +123,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setMaxThreadsPerHTable(DEFAULT_HTABLE_MAX_THREADS) .setDefaultIndexPopulationWaitTime(DEFAULT_INDEX_POPULATION_WAIT_TIME) .setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY) + .set(AGGREGATE_CHUNK_SIZE_INCREASE_ATTRIB, DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE) // setup default configs for Tephra .set(TxConstants.Manager.CFG_DO_PERSIST, false) .set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times")