Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1F5DC200D0B for ; Tue, 29 Aug 2017 00:21:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1DA791611AF; Mon, 28 Aug 2017 22:21:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E3440160C95 for ; Tue, 29 Aug 2017 00:21:48 +0200 (CEST) Received: (qmail 68134 invoked by uid 500); 28 Aug 2017 22:21:48 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 68125 invoked by uid 99); 28 Aug 2017 22:21:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Aug 2017 22:21:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BF785F32AD; Mon, 28 Aug 2017 22:21:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jamestaylor@apache.org To: commits@phoenix.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: phoenix git commit: PHOENIX-418 Support approximate COUNT DISTINCT (Ethan Wang) Date: Mon, 28 Aug 2017 22:21:47 +0000 (UTC) archived-at: Mon, 28 Aug 2017 22:21:50 -0000 Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 b8595f1af -> cce7ab29a PHOENIX-418 Support approximate COUNT DISTINCT (Ethan Wang) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cce7ab29 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cce7ab29 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cce7ab29 Branch: refs/heads/4.x-HBase-1.1 Commit: cce7ab29a41eb3c0d6e01889e5f5caa2b94bac5c Parents: b8595f1 Author: James Taylor Authored: Mon Aug 28 15:17:37 2017 -0700 Committer: James Taylor Committed: Mon Aug 28 15:20:45 2017 -0700 ---------------------------------------------------------------------- dev/release_files/NOTICE | 8 + phoenix-core/pom.xml | 5 + .../CountDistinctApproximateHyperLogLogIT.java | 154 +++++++++++++++ .../phoenix/expression/ExpressionType.java | 4 +- ...stinctCountHyperLogLogAggregateFunction.java | 192 +++++++++++++++++++ ...tinctCountHyperLogLogAggregateParseNode.java | 39 ++++ 6 files changed, 401 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/dev/release_files/NOTICE ---------------------------------------------------------------------- diff --git a/dev/release_files/NOTICE b/dev/release_files/NOTICE index eed1edc..b7b40c0 100644 --- a/dev/release_files/NOTICE +++ b/dev/release_files/NOTICE @@ -277,3 +277,11 @@ jcip-annotations findbugs-annotations Copyright Stephen Connolly. + +stream-lib + + Copyright 2016 AddThis. + This product includes software developed by AddThis. + This product also includes code adapted from: + - software copyright (c) 2014 The Apache Software Foundation., http://lucene.apache.org/solr/ + - software copyright (c) 2014 The Apache Software Foundation., http://mahout.apache.org/ http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index f7b0593..7ad32af 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -470,5 +470,10 @@ joni ${joni.version} + + com.clearspring.analytics + stream + 2.9.5 + http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java new file mode 100644 index 0000000..3de1509 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.phoenix.schema.ColumnNotFoundException; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; +import java.util.Properties; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.*; + +public class CountDistinctApproximateHyperLogLogIT extends ParallelStatsDisabledIT { + private String tableName; + + @Before + public void generateTableNames() { + tableName = "T_" + generateUniqueName(); + } + + @Test(expected = ColumnNotFoundException.class) + public void testDistinctCountException() throws Exception { + String query = "SELECT APPROX_COUNT_DISTINCT(x) FROM " + tableName; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement statement = conn.prepareStatement(query);) { + prepareTableWithValues(conn, 100); + ResultSet rs = statement.executeQuery(); + } + } + + @Test + public void testDistinctCountOnConstant() throws Exception { + String query = "SELECT APPROX_COUNT_DISTINCT(20) FROM " + tableName; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement statement = conn.prepareStatement(query);) { + prepareTableWithValues(conn, 100); + ResultSet rs = statement.executeQuery(); + + assertTrue(rs.next()); + assertEquals(1, rs.getLong(1)); + assertFalse(rs.next()); + } + } + + @Test + public void testDistinctCountOnSingleColumn() throws Exception { + String query = "SELECT APPROX_COUNT_DISTINCT(i2) FROM " + tableName; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement statement = conn.prepareStatement(query);) { + prepareTableWithValues(conn, 100); + ResultSet rs = statement.executeQuery(); + + assertTrue(rs.next()); + assertEquals(10, rs.getLong(1)); + assertFalse(rs.next()); + } + } + + @Test + public void testDistinctCountOnMutlipleColumns() throws Exception { + String query = "SELECT APPROX_COUNT_DISTINCT(i1||i2) FROM " + tableName; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement statement = conn.prepareStatement(query);) { + prepareTableWithValues(conn, 100); + + ResultSet rs = statement.executeQuery(); + + assertTrue(rs.next()); + assertEquals(100, rs.getLong(1)); + assertFalse(rs.next()); + } + } + + @Test + public void testDistinctCountOnjoining() throws Exception { + String query = "SELECT APPROX_COUNT_DISTINCT(a.i1||a.i2||b.i2) FROM " + tableName + " a, " + tableName + + " b where a.i1=b.i1 and a.i2 = b.i2"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try(Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement statement = conn.prepareStatement(query);) { + prepareTableWithValues(conn, 100); + ResultSet rs = statement.executeQuery(); + + assertTrue(rs.next()); + assertEquals(100, rs.getLong(1)); + assertFalse(rs.next()); + } + } + + @Test + public void testDistinctCountPlanExlain() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String query = "explain SELECT APPROX_COUNT_DISTINCT(i1||i2) FROM " + tableName; + + try(Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement statement = conn.prepareStatement(query);) { + prepareTableWithValues(conn, 100); + ResultSet rs = statement.executeQuery(); + + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + " SERVER AGGREGATE INTO SINGLE ROW", + QueryUtil.getExplainPlan(rs)); + } + } + + /** + * Prepare tables with stats updated. format of first table such as i1, i2 + * 1, 10 2, 20 3, 30 ... + * + * @param conn + * @param nRows + * @throws Exception + */ + final private void prepareTableWithValues(final Connection conn, final int nRows) throws Exception { + conn.createStatement().execute("create table " + tableName + "\n" + + " (i1 integer not null, i2 integer not null\n" + " CONSTRAINT pk PRIMARY KEY (i1,i2))"); + + final PreparedStatement stmt = conn.prepareStatement("upsert into " + tableName + " VALUES (?, ?)"); + for (int i = 0; i < nRows; i++) { + stmt.setInt(1, i); + stmt.setInt(2, (i * 10) % 100); + stmt.execute(); + } + conn.commit(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java index b8f68da..4f26e87 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java @@ -130,6 +130,7 @@ import org.apache.phoenix.expression.function.WeekFunction; import org.apache.phoenix.expression.function.YearFunction; import org.apache.phoenix.expression.function.DayOfWeekFunction; import org.apache.phoenix.expression.function.DayOfYearFunction; +import org.apache.phoenix.expression.function.DistinctCountHyperLogLogAggregateFunction; import com.google.common.collect.Maps; @@ -292,7 +293,8 @@ public enum ExpressionType { DefaultValueExpression(DefaultValueExpression.class), ArrayColumnExpression(SingleCellColumnExpression.class), FirstValuesFunction(FirstValuesFunction.class), - LastValuesFunction(LastValuesFunction.class); + LastValuesFunction(LastValuesFunction.class), + DistinctCountHyperLogLogAggregateFunction(DistinctCountHyperLogLogAggregateFunction.class); ExpressionType(Class clazz) { this.clazz = clazz; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java new file mode 100644 index 0000000..7e18afd --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression.function; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.BaseAggregator; +import org.apache.phoenix.expression.aggregator.DistinctCountClientAggregator; +import org.apache.phoenix.parse.FunctionParseNode.Argument; +import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; +import org.apache.phoenix.parse.DistinctCountHyperLogLogAggregateParseNode; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.util.ByteUtil; + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; + +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.tuple.Tuple; + +/** + * Built-in function for Distinct Count Aggregation + * function in approximation. + * This aggregator is implemented using HyperLogLog. + * Please refer to PHOENIX-418 + * https://issues.apache.org/jira/browse/PHOENIX-418 + * + * + * 1, Accuracy input is not a customizeable. In HyperLogLog + * accuracy is propertional to 1/sqrt(m), m is the size of + * the hll hash. Also, this process is irrelavent to runtime + * or space complexity. + * + * 2, The two parameters that requires during HLL initialization. + * i.e., the precision value for the normal set and the precision + * value for the sparse set, is hard coded as static final + * variable. Any change of them requires re-deployment of the + * phoenix server coprocessors. + * + */ +@BuiltInFunction(name=DistinctCountHyperLogLogAggregateFunction.NAME, nodeClass=DistinctCountHyperLogLogAggregateParseNode.class, args= {@Argument()} ) +public class DistinctCountHyperLogLogAggregateFunction extends DistinctCountAggregateFunction { + public static final String NAME = "APPROX_COUNT_DISTINCT"; + public static final int NormalSetPrecision = 16; + public static final int SparseSetPrecision = 25; + + public DistinctCountHyperLogLogAggregateFunction() { + } + + public DistinctCountHyperLogLogAggregateFunction(List childExpressions){ + super(childExpressions, null); + } + + public DistinctCountHyperLogLogAggregateFunction(List childExpressions, CountAggregateFunction delegate){ + super(childExpressions, delegate); + } + + @Override + public DistinctCountClientAggregator newClientAggregator() { + return new HyperLogLogClientAggregator(SortOrder.getDefault()); + } + + @Override + public Aggregator newServerAggregator(Configuration conf) { + final Expression child = getAggregatorExpression(); + return new HyperLogLogServerAggregator(child.getSortOrder()){ + @Override + protected PDataType getInputDataType() { + return child.getDataType(); + } + }; + } + + @Override + public Aggregator newServerAggregator(Configuration conf, ImmutableBytesWritable ptr) { + final Expression child = getAggregatorExpression(); + return new HyperLogLogServerAggregator(child.getSortOrder(), ptr) { + @Override + protected PDataType getInputDataType() { + return child.getDataType(); + } + }; + } + + @Override + public String getName() { + return NAME; + } +} + + +/** +* ClientSide HyperLogLogAggregator +* It will be called when server side aggregator has finished +* Method aggregate is called for every new server aggregator returned +* Method evaluate is called when the aggregate is done. +* the return of evaluate will be send back to user as +* counted result of expression.evaluate +*/ +class HyperLogLogClientAggregator extends DistinctCountClientAggregator{ + private HyperLogLogPlus hll = new HyperLogLogPlus(DistinctCountHyperLogLogAggregateFunction.NormalSetPrecision, DistinctCountHyperLogLogAggregateFunction.SparseSetPrecision); + + public HyperLogLogClientAggregator(SortOrder sortOrder) { + super(sortOrder); + } + + @Override + public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) { + try { + hll.addAll(HyperLogLogPlus.Builder.build(ByteUtil.copyKeyBytesIfNecessary(ptr))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { + byte[] buffer = new byte[PLong.INSTANCE.getByteSize()]; + PLong.INSTANCE.getCodec().encodeLong(hll.cardinality(), buffer, 0); + ptr.set(buffer); + return true; + } +} + + +/** + * ServerSide HyperLogLogAggregator + * It will be serialized and dispatched to region server + * Method aggregate is called for every new row scanned + * Method evaluate is called when this remote scan is over. + * the return of evaluate will be send back to ClientSideAggregator.aggregate + */ +abstract class HyperLogLogServerAggregator extends BaseAggregator{ + private HyperLogLogPlus hll = new HyperLogLogPlus(DistinctCountHyperLogLogAggregateFunction.NormalSetPrecision, DistinctCountHyperLogLogAggregateFunction.SparseSetPrecision); + protected final ImmutableBytesWritable valueByteArray = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); + + public HyperLogLogServerAggregator(SortOrder sortOrder) { + super(sortOrder); + } + + public HyperLogLogServerAggregator(SortOrder sortOrder, ImmutableBytesWritable ptr) { + this(sortOrder); + if(ptr !=null){ + hll.offer(ptr); + } + } + + @Override + public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) { + hll.offer(ptr); + } + + @Override + public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { + try { + valueByteArray.set(hll.getBytes(), 0, hll.getBytes().length); + ptr.set(ByteUtil.copyKeyBytesIfNecessary(valueByteArray)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return true; + } + + @Override + public final PDataType getDataType() { + return PVarbinary.INSTANCE; + } + + abstract protected PDataType getInputDataType(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java new file mode 100644 index 0000000..2fa6e10 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.function.FunctionExpression; +import org.apache.phoenix.expression.function.DistinctCountHyperLogLogAggregateFunction; +import org.apache.phoenix.expression.function.SumAggregateFunction; + + +public class DistinctCountHyperLogLogAggregateParseNode extends DelegateConstantToCountParseNode { + public DistinctCountHyperLogLogAggregateParseNode(String name, List children, BuiltInFunctionInfo info) { + super(name, children, info); + } + + @Override + public FunctionExpression create(List children, StatementContext context) throws SQLException { + return new DistinctCountHyperLogLogAggregateFunction(children, getDelegateFunction(children,context)); + } +}