phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject phoenix git commit: PHOENIX-418 Support approximate COUNT DISTINCT (Ethan Wang)
Date Mon, 28 Aug 2017 22:21:47 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 b8595f1af -> cce7ab29a


PHOENIX-418 Support approximate COUNT DISTINCT (Ethan Wang)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cce7ab29
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cce7ab29
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cce7ab29

Branch: refs/heads/4.x-HBase-1.1
Commit: cce7ab29a41eb3c0d6e01889e5f5caa2b94bac5c
Parents: b8595f1
Author: James Taylor <jtaylor@salesforce.com>
Authored: Mon Aug 28 15:17:37 2017 -0700
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Mon Aug 28 15:20:45 2017 -0700

----------------------------------------------------------------------
 dev/release_files/NOTICE                        |   8 +
 phoenix-core/pom.xml                            |   5 +
 .../CountDistinctApproximateHyperLogLogIT.java  | 154 +++++++++++++++
 .../phoenix/expression/ExpressionType.java      |   4 +-
 ...stinctCountHyperLogLogAggregateFunction.java | 192 +++++++++++++++++++
 ...tinctCountHyperLogLogAggregateParseNode.java |  39 ++++
 6 files changed, 401 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/dev/release_files/NOTICE
----------------------------------------------------------------------
diff --git a/dev/release_files/NOTICE b/dev/release_files/NOTICE
index eed1edc..b7b40c0 100644
--- a/dev/release_files/NOTICE
+++ b/dev/release_files/NOTICE
@@ -277,3 +277,11 @@ jcip-annotations
 findbugs-annotations
 
   Copyright Stephen Connolly.
+
+stream-lib
+
+  Copyright 2016 AddThis.
+  This product includes software developed by AddThis.
+  This product also includes code adapted from:
+    - software copyright (c) 2014 The Apache Software Foundation., http://lucene.apache.org/solr/
+    - software copyright (c) 2014 The Apache Software Foundation., http://mahout.apache.org/

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index f7b0593..7ad32af 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -470,5 +470,10 @@
         <artifactId>joni</artifactId>
         <version>${joni.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.clearspring.analytics</groupId>
+      <artifactId>stream</artifactId>
+      <version>2.9.5</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java
new file mode 100644
index 0000000..3de1509
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.*;
+
+public class CountDistinctApproximateHyperLogLogIT extends ParallelStatsDisabledIT {
+	private String tableName;
+
+	@Before
+	public void generateTableNames() {
+		tableName = "T_" + generateUniqueName();
+	}
+
+	@Test(expected = ColumnNotFoundException.class)
+	public void testDistinctCountException() throws Exception {
+		String query = "SELECT APPROX_COUNT_DISTINCT(x) FROM " + tableName;
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);
+				PreparedStatement statement = conn.prepareStatement(query);) {
+			prepareTableWithValues(conn, 100);
+			ResultSet rs = statement.executeQuery();
+		}
+	}
+
+	@Test
+	public void testDistinctCountOnConstant() throws Exception {
+		String query = "SELECT APPROX_COUNT_DISTINCT(20) FROM " + tableName;
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);
+				PreparedStatement statement = conn.prepareStatement(query);) {
+			prepareTableWithValues(conn, 100);
+			ResultSet rs = statement.executeQuery();
+
+			assertTrue(rs.next());
+			assertEquals(1, rs.getLong(1));
+			assertFalse(rs.next());
+		}
+	}
+
+	@Test
+	public void testDistinctCountOnSingleColumn() throws Exception {
+		String query = "SELECT APPROX_COUNT_DISTINCT(i2) FROM " + tableName;
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);
+				PreparedStatement statement = conn.prepareStatement(query);) {
+			prepareTableWithValues(conn, 100);
+			ResultSet rs = statement.executeQuery();
+
+			assertTrue(rs.next());
+			assertEquals(10, rs.getLong(1));
+			assertFalse(rs.next());
+		}
+	}
+
+	@Test
+	public void testDistinctCountOnMutlipleColumns() throws Exception {
+		String query = "SELECT APPROX_COUNT_DISTINCT(i1||i2) FROM " + tableName;
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);
+				PreparedStatement statement = conn.prepareStatement(query);) {
+			prepareTableWithValues(conn, 100);
+
+			ResultSet rs = statement.executeQuery();
+
+			assertTrue(rs.next());
+			assertEquals(100, rs.getLong(1));
+			assertFalse(rs.next());
+		}
+	}
+
+	@Test
+	public void testDistinctCountOnjoining() throws Exception {
+		String query = "SELECT APPROX_COUNT_DISTINCT(a.i1||a.i2||b.i2) FROM " + tableName + " a,
" + tableName
+				+ " b where a.i1=b.i1 and a.i2 = b.i2";
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		
+		try(Connection conn = DriverManager.getConnection(getUrl(), props);
+			PreparedStatement statement = conn.prepareStatement(query);) {
+			prepareTableWithValues(conn, 100);
+			ResultSet rs = statement.executeQuery();
+
+			assertTrue(rs.next());
+			assertEquals(100, rs.getLong(1));
+			assertFalse(rs.next());
+		}
+	}
+
+	@Test
+	public void testDistinctCountPlanExlain() throws Exception {
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		String query = "explain SELECT APPROX_COUNT_DISTINCT(i1||i2) FROM " + tableName;
+		
+		try(Connection conn = DriverManager.getConnection(getUrl(), props);
+			PreparedStatement statement = conn.prepareStatement(query);) {
+			prepareTableWithValues(conn, 100);
+			ResultSet rs = statement.executeQuery();
+
+			assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n"
+					+ "    SERVER FILTER BY FIRST KEY ONLY\n" + "    SERVER AGGREGATE INTO SINGLE ROW",
+					QueryUtil.getExplainPlan(rs));
+		}
+	}
+
+	/**
+	 * Prepare tables with stats updated. format of first table such as i1, i2
+	 * 1, 10 2, 20 3, 30 ...
+	 * 
+	 * @param conn
+	 * @param nRows
+	 * @throws Exception
+	 */
+	final private void prepareTableWithValues(final Connection conn, final int nRows) throws
Exception {
+		conn.createStatement().execute("create table " + tableName + "\n"
+				+ "   (i1 integer not null, i2 integer not null\n" + "    CONSTRAINT pk PRIMARY KEY (i1,i2))");
+
+		final PreparedStatement stmt = conn.prepareStatement("upsert into " + tableName + " VALUES
(?, ?)");
+		for (int i = 0; i < nRows; i++) {
+			stmt.setInt(1, i);
+			stmt.setInt(2, (i * 10) % 100);
+			stmt.execute();
+		}
+		conn.commit();
+	}
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index b8f68da..4f26e87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -130,6 +130,7 @@ import org.apache.phoenix.expression.function.WeekFunction;
 import org.apache.phoenix.expression.function.YearFunction;
 import org.apache.phoenix.expression.function.DayOfWeekFunction;
 import org.apache.phoenix.expression.function.DayOfYearFunction;
+import org.apache.phoenix.expression.function.DistinctCountHyperLogLogAggregateFunction;
 
 import com.google.common.collect.Maps;
 
@@ -292,7 +293,8 @@ public enum ExpressionType {
     DefaultValueExpression(DefaultValueExpression.class),
     ArrayColumnExpression(SingleCellColumnExpression.class),
     FirstValuesFunction(FirstValuesFunction.class),
-    LastValuesFunction(LastValuesFunction.class);
+    LastValuesFunction(LastValuesFunction.class),
+    DistinctCountHyperLogLogAggregateFunction(DistinctCountHyperLogLogAggregateFunction.class);
 
     ExpressionType(Class<? extends Expression> clazz) {
         this.clazz = clazz;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java
new file mode 100644
index 0000000..7e18afd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.BaseAggregator;
+import org.apache.phoenix.expression.aggregator.DistinctCountClientAggregator;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.DistinctCountHyperLogLogAggregateParseNode;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.ByteUtil;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Built-in function for Distinct Count Aggregation 
+ * function in approximation. 
+ * This aggregator is implemented using HyperLogLog.
+ * Please refer to PHOENIX-418
+ * https://issues.apache.org/jira/browse/PHOENIX-418
+ * 
+ * 
+ * 1, Accuracy input is not a customizeable. In HyperLogLog
+ * accuracy is propertional to 1/sqrt(m), m is the size of
+ * the hll hash. Also, this process is irrelavent to runtime
+ * or space complexity.
+ * 
+ * 2, The two parameters that requires during HLL initialization. 
+ * i.e., the precision value for the normal set and the precision 
+ * value for the sparse set, is hard coded as static final 
+ * variable. Any change of them requires re-deployment of the 
+ * phoenix server coprocessors.
+ * 
+ */
+@BuiltInFunction(name=DistinctCountHyperLogLogAggregateFunction.NAME, nodeClass=DistinctCountHyperLogLogAggregateParseNode.class,
args= {@Argument()} )
+public class DistinctCountHyperLogLogAggregateFunction extends DistinctCountAggregateFunction
{
+    public static final String NAME = "APPROX_COUNT_DISTINCT";
+    public static final int NormalSetPrecision = 16;
+    public static final int SparseSetPrecision = 25;
+    
+    public DistinctCountHyperLogLogAggregateFunction() {
+    }
+    
+    public DistinctCountHyperLogLogAggregateFunction(List<Expression> childExpressions){
+        super(childExpressions, null);
+    }
+    
+    public DistinctCountHyperLogLogAggregateFunction(List<Expression> childExpressions,
CountAggregateFunction delegate){
+        super(childExpressions, delegate);
+    }
+
+    @Override
+    public DistinctCountClientAggregator newClientAggregator() {
+    	return new HyperLogLogClientAggregator(SortOrder.getDefault());
+    }
+    
+    @Override
+    public Aggregator newServerAggregator(Configuration conf) {
+        final Expression child = getAggregatorExpression();
+        return new HyperLogLogServerAggregator(child.getSortOrder()){
+			@Override
+			protected PDataType getInputDataType() {
+				return child.getDataType();
+			}
+        };
+    }
+    
+    @Override
+    public Aggregator newServerAggregator(Configuration conf, ImmutableBytesWritable ptr)
{
+        final Expression child = getAggregatorExpression();
+        return new HyperLogLogServerAggregator(child.getSortOrder(), ptr) {
+          @Override
+          protected PDataType getInputDataType() {
+            return child.getDataType();
+          }
+        };
+    }
+   
+    @Override
+    public String getName() {
+        return NAME;
+    }
+}
+
+
+/**
+* ClientSide HyperLogLogAggregator
+* It will be called when server side aggregator has finished
+* Method aggregate is called for every new server aggregator returned
+* Method evaluate is called when the aggregate is done.
+* the return of evaluate will be send back to user as 
+* counted result of expression.evaluate
+*/
+class HyperLogLogClientAggregator extends DistinctCountClientAggregator{
+	private HyperLogLogPlus hll = new HyperLogLogPlus(DistinctCountHyperLogLogAggregateFunction.NormalSetPrecision,
DistinctCountHyperLogLogAggregateFunction.SparseSetPrecision);
+
+	public HyperLogLogClientAggregator(SortOrder sortOrder) {
+		super(sortOrder);
+	}
+	
+	@Override
+	public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+		try {
+			hll.addAll(HyperLogLogPlus.Builder.build(ByteUtil.copyKeyBytesIfNecessary(ptr)));
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {	
+		byte[] buffer = new byte[PLong.INSTANCE.getByteSize()];
+		PLong.INSTANCE.getCodec().encodeLong(hll.cardinality(), buffer, 0);
+		ptr.set(buffer);
+		return true;
+	}
+}
+
+
+/**
+ * ServerSide HyperLogLogAggregator
+ * It will be serialized and dispatched to region server
+ * Method aggregate is called for every new row scanned
+ * Method evaluate is called when this remote scan is over.
+ * the return of evaluate will be send back to ClientSideAggregator.aggregate 
+ */
+abstract class HyperLogLogServerAggregator extends BaseAggregator{
+	private HyperLogLogPlus hll = new HyperLogLogPlus(DistinctCountHyperLogLogAggregateFunction.NormalSetPrecision,
DistinctCountHyperLogLogAggregateFunction.SparseSetPrecision);
+	protected final ImmutableBytesWritable valueByteArray = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+
+	public HyperLogLogServerAggregator(SortOrder sortOrder) {
+		super(sortOrder);
+	}
+	
+	public HyperLogLogServerAggregator(SortOrder sortOrder, ImmutableBytesWritable ptr) {
+		this(sortOrder);
+		if(ptr !=null){
+			hll.offer(ptr);
+		}
+	}
+
+	@Override
+	public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+		hll.offer(ptr);
+	}
+
+	@Override
+	public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {	
+		try {
+			valueByteArray.set(hll.getBytes(), 0, hll.getBytes().length);
+			ptr.set(ByteUtil.copyKeyBytesIfNecessary(valueByteArray));
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		return true;
+	}
+
+	@Override
+	public final PDataType getDataType() {
+		return PVarbinary.INSTANCE;
+	}
+	
+	abstract protected PDataType getInputDataType();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cce7ab29/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java
b/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java
new file mode 100644
index 0000000..2fa6e10
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.DistinctCountHyperLogLogAggregateFunction;
+import org.apache.phoenix.expression.function.SumAggregateFunction;
+
+
+public class DistinctCountHyperLogLogAggregateParseNode extends DelegateConstantToCountParseNode
{
+    public DistinctCountHyperLogLogAggregateParseNode(String name, List<ParseNode>
children, BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+    
+    @Override
+    public FunctionExpression create(List<Expression> children, StatementContext context)
throws SQLException {
+        return new DistinctCountHyperLogLogAggregateFunction(children, getDelegateFunction(children,context));
+    }
+}


Mime
View raw message