Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5E9DA200BDC for ; Wed, 14 Dec 2016 08:30:51 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5D132160B36; Wed, 14 Dec 2016 07:30:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 35201160B13 for ; Wed, 14 Dec 2016 08:30:49 +0100 (CET) Received: (qmail 45413 invoked by uid 500); 14 Dec 2016 07:30:47 -0000 Mailing-List: contact commits-help@kylin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.apache.org Delivered-To: mailing list commits@kylin.apache.org Received: (qmail 45404 invoked by uid 99); 14 Dec 2016 07:30:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Dec 2016 07:30:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 858D3E040F; Wed, 14 Dec 2016 07:30:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liyang@apache.org To: commits@kylin.apache.org Date: Wed, 14 Dec 2016 07:30:47 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] kylin git commit: KYLIN-1832 HyperLogLog performance optimization archived-at: Wed, 14 Dec 2016 07:30:51 -0000 Repository: kylin Updated Branches: refs/heads/master 530365131 -> e6e330a8b http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java deleted file mode 100644 index 5b7c565..0000000 --- a/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure.hll; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; - -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.junit.Assert; -import org.junit.Test; - -/** - * @author yangli9 - * - */ -public class HyperLogLogCounterTest { - - ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); - Random rand1 = new Random(1); - Random rand2 = new Random(2); - Random rand3 = new Random(3); - int errorCount1 = 0; - int errorCount2 = 0; - int errorCount3 = 0; - - @Test - public void testOneAdd() throws IOException { - HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(14); - HyperLogLogPlusCounter one = new HyperLogLogPlusCounter(14); - for (int i = 0; i < 1000000; i++) { - one.clear(); - one.add(rand1.nextInt()); - hllc.merge(one); - } - assertTrue(hllc.getCountEstimate() > 1000000 * 0.9); - } - - @Test - public void testPeekLength() throws IOException { - HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(10); - HyperLogLogPlusCounter copy = new HyperLogLogPlusCounter(10); - byte[] value = new byte[10]; - for (int i = 0; i < 200000; i++) { - rand1.nextBytes(value); - hllc.add(value); - - buf.clear(); - hllc.writeRegisters(buf); - - int len = buf.position(); - buf.position(0); - assertEquals(len, hllc.peekLength(buf)); - - copy.readRegisters(buf); - assertEquals(len, buf.position()); - assertEquals(hllc, copy); - } - buf.clear(); - } - - private Set generateTestData(int n) { - Set testData = new HashSet(); - for (int i = 0; i < n; i++) { - String[] samples = generateSampleData(); - for (String sample : samples) { - testData.add(sample); - } - } - return testData; - } - - // simulate the visit (=visitor+id) - private String[] generateSampleData() { - - StringBuilder buf = new StringBuilder(); - for (int i = 0; i < 19; i++) { - buf.append(Math.abs(rand1.nextInt()) % 10); - } - String header = buf.toString(); - - int size = Math.abs(rand3.nextInt()) % 9 + 1; - String[] samples = new String[size]; - for (int k = 0; k < size; k++) { - buf = new StringBuilder(header); - buf.append("-"); - for (int i = 0; i < 10; i++) { - buf.append(Math.abs(rand3.nextInt()) % 10); - } - samples[k] = buf.toString(); - } - - return samples; - } - - @Test - public void countTest() throws IOException { - int n = 10; - for (int i = 0; i < 5; i++) { - count(n); - n *= 10; - } - } - - private void count(int n) throws IOException { - Set testSet = generateTestData(n); - - HyperLogLogPlusCounter hllc = newHLLC(); - for (String testData : testSet) { - hllc.add(Bytes.toBytes(testData)); - } - long estimate = hllc.getCountEstimate(); - double errorRate = hllc.getErrorRate(); - double actualError = (double) Math.abs(testSet.size() - estimate) / testSet.size(); - System.out.println(estimate); - System.out.println(testSet.size()); - System.out.println(errorRate); - System.out.println("=" + actualError); - Assert.assertTrue(actualError < errorRate * 3.0); - - checkSerialize(hllc); - } - - private void checkSerialize(HyperLogLogPlusCounter hllc) throws IOException { - long estimate = hllc.getCountEstimate(); - buf.clear(); - hllc.writeRegisters(buf); - buf.flip(); - hllc.readRegisters(buf); - Assert.assertEquals(estimate, hllc.getCountEstimate()); - } - - @Test - public void mergeTest() throws IOException { - double error = 0; - int n = 100; - for (int i = 0; i < n; i++) { - double e = merge(i); - error += e; - } - System.out.println("Total average error is " + error / n); - - System.out.println(" errorRateCount1 is " + errorCount1 + "!"); - System.out.println(" errorRateCount2 is " + errorCount2 + "!"); - System.out.println(" errorRateCount3 is " + errorCount3 + "!"); - - Assert.assertTrue(errorCount1 <= n * 0.30); - Assert.assertTrue(errorCount2 <= n * 0.05); - Assert.assertTrue(errorCount3 <= n * 0.02); - } - - private double merge(int round) throws IOException { - int ln = 20; - int dn = 100 * (round + 1); - Set testSet = new HashSet(); - HyperLogLogPlusCounter[] hllcs = new HyperLogLogPlusCounter[ln]; - for (int i = 0; i < ln; i++) { - hllcs[i] = newHLLC(); - for (int k = 0; k < dn; k++) { - String[] samples = generateSampleData(); - for (String data : samples) { - testSet.add(data); - hllcs[i].add(Bytes.toBytes(data)); - } - } - } - HyperLogLogPlusCounter mergeHllc = newHLLC(); - for (HyperLogLogPlusCounter hllc : hllcs) { - mergeHllc.merge(serDes(hllc)); - } - - double errorRate = mergeHllc.getErrorRate(); - long estimate = mergeHllc.getCountEstimate(); - double actualError = Math.abs((double) (testSet.size() - estimate) / testSet.size()); - - System.out.println(testSet.size() + "-" + estimate + " ~ " + actualError); - Assert.assertTrue(actualError < 0.1); - - if (actualError > errorRate) { - errorCount1++; - } - if (actualError > 2 * errorRate) { - errorCount2++; - } - if (actualError > 3 * errorRate) { - errorCount3++; - } - - return actualError; - } - - private HyperLogLogPlusCounter serDes(HyperLogLogPlusCounter hllc) throws IOException { - buf.clear(); - hllc.writeRegisters(buf); - buf.flip(); - HyperLogLogPlusCounter copy = new HyperLogLogPlusCounter(hllc.getPrecision()); - copy.readRegisters(buf); - Assert.assertEquals(copy.getCountEstimate(), hllc.getCountEstimate()); - return copy; - } - - @Test - public void testPerformance() throws IOException { - int N = 3; // reduce N HLLC into one - int M = 1000; // for M times, use 100000 for real perf test - - HyperLogLogPlusCounter samples[] = new HyperLogLogPlusCounter[N]; - for (int i = 0; i < N; i++) { - samples[i] = newHLLC(); - for (String str : generateTestData(10000)) - samples[i].add(str); - } - - System.out.println("Perf test running ... "); - long start = System.currentTimeMillis(); - HyperLogLogPlusCounter sum = newHLLC(); - for (int i = 0; i < M; i++) { - sum.clear(); - for (int j = 0; j < N; j++) { - sum.merge(samples[j]); - checkSerialize(sum); - } - } - long duration = System.currentTimeMillis() - start; - System.out.println("Perf test result: " + duration / 1000 + " seconds"); - } - - @Test - public void testEquivalence() { - byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 }; - byte[] b = new byte[] { 3, 4, 42 }; - HyperLogLogPlusCounter ha = new HyperLogLogPlusCounter(); - HyperLogLogPlusCounter hb = new HyperLogLogPlusCounter(); - ha.add(a, 1, 3); - hb.add(b); - - Assert.assertTrue(ha.getCountEstimate() == hb.getCountEstimate()); - } - - private HyperLogLogPlusCounter newHLLC() { - return new HyperLogLogPlusCounter(16); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/test/java/org/apache/kylin/measure/hll2/HyperLogLogCounterNewTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/hll2/HyperLogLogCounterNewTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/hll2/HyperLogLogCounterNewTest.java new file mode 100644 index 0000000..feb8c8e --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/hll2/HyperLogLogCounterNewTest.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.hll2; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterOld; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.RegisterType; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Created by xiefan on 16-12-12. + */ +public class HyperLogLogCounterNewTest { + ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); + Random rand1 = new Random(1); + Random rand2 = new Random(2); + Random rand3 = new Random(3); + int errorCount1 = 0; + int errorCount2 = 0; + int errorCount3 = 0; + + @Test + public void testOneAdd() throws IOException { + HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew(14); + HyperLogLogPlusCounterNew one = new HyperLogLogPlusCounterNew(14); + for (int i = 0; i < 1000000; i++) { + one.clear(); + one.add(rand1.nextInt()); + hllc.merge(one); + } + System.out.println(hllc.getCountEstimate()); + assertTrue(hllc.getCountEstimate() > 1000000 * 0.9); + } + + @Test + public void tesSparseEstimate() throws IOException { + HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew(14); + for (int i = 0; i < 10; i++) { + hllc.add(i); + } + System.out.println(hllc.getCountEstimate()); + assertTrue(hllc.getCountEstimate() > 10 * 0.9); + } + + @Test + public void countTest() throws IOException { + int n = 10; + for (int i = 0; i < 5; i++) { + count(n); + n *= 10; + } + } + + @Test + public void mergeTest() throws IOException { + double error = 0; + int n = 100; + for (int i = 0; i < n; i++) { + double e = merge(i); + error += e; + } + System.out.println("Total average error is " + error / n); + + System.out.println(" errorRateCount1 is " + errorCount1 + "!"); + System.out.println(" errorRateCount2 is " + errorCount2 + "!"); + System.out.println(" errorRateCount3 is " + errorCount3 + "!"); + + Assert.assertTrue(errorCount1 <= n * 0.30); + Assert.assertTrue(errorCount2 <= n * 0.05); + Assert.assertTrue(errorCount3 <= n * 0.02); + } + + /* + compare the result of two different hll counter + */ + @Test + public void compareResult() { + int p = 12; //4096 + int m = 1 << p; + + for (int t = 0; t < 5; t++) { + //compare sparse + HyperLogLogPlusCounterOld oldCounter = new HyperLogLogPlusCounterOld(p); + HyperLogLogPlusCounterNew newCounter = new HyperLogLogPlusCounterNew(p); + + for (int i = 0; i < 20; i++) { + //int r = rand1.nextInt(); + oldCounter.add(i); + newCounter.add(i); + } + assertEquals(RegisterType.SPARSE, newCounter.getRegisterType()); + assertEquals(oldCounter.getCountEstimate(), newCounter.getCountEstimate()); + //compare dense + for (int i = 0; i < m; i++) { + oldCounter.add(i); + newCounter.add(i); + } + assertEquals(RegisterType.DENSE, newCounter.getRegisterType()); + assertEquals(oldCounter.getCountEstimate(), newCounter.getCountEstimate()); + } + + } + + @Test + public void testPeekLength() throws IOException { + HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew(10); + HyperLogLogPlusCounterNew copy = new HyperLogLogPlusCounterNew(10); + byte[] value = new byte[10]; + for (int i = 0; i < 200000; i++) { + rand1.nextBytes(value); + hllc.add(value); + + buf.clear(); + hllc.writeRegisters(buf); + + int len = buf.position(); + buf.position(0); + assertEquals(len, hllc.peekLength(buf)); + + copy.readRegisters(buf); + assertEquals(len, buf.position()); + assertEquals(hllc, copy); + } + buf.clear(); + } + + @Test + public void testEquivalence() { + byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 }; + byte[] b = new byte[] { 3, 4, 42 }; + HyperLogLogPlusCounterNew ha = new HyperLogLogPlusCounterNew(); + HyperLogLogPlusCounterNew hb = new HyperLogLogPlusCounterNew(); + ha.add(a, 1, 3); + hb.add(b); + + Assert.assertTrue(ha.getCountEstimate() == hb.getCountEstimate()); + } + + @Test + public void testAutoChangeToSparse() { + int p = 15; + int m = 1 << p; + HyperLogLogPlusCounterNew counter = new HyperLogLogPlusCounterNew(p); + assertEquals(RegisterType.SPARSE, counter.getRegisterType()); + double over = HyperLogLogPlusCounterNew.overflowFactor * m; + int overFlow = (int) over + 1000; + for (int i = 0; i < overFlow; i++) + counter.add(i); + assertEquals(RegisterType.DENSE, counter.getRegisterType()); + } + + @Test + public void testSerialilze() throws Exception { + //test sparse serialize + int p = 15; + int m = 1 << p; + HyperLogLogPlusCounterNew counter = new HyperLogLogPlusCounterNew(p); + counter.add(123); + assertEquals(RegisterType.SPARSE, counter.getRegisterType()); + checkSerialize(counter); + //test dense serialize + double over = HyperLogLogPlusCounterNew.overflowFactor * m; + int overFlow = (int) over + 1000; + for (int i = 0; i < overFlow; i++) + counter.add(i); + assertEquals(RegisterType.DENSE, counter.getRegisterType()); + checkSerialize(counter); + } + + private Set generateTestData(int n) { + Set testData = new HashSet(); + for (int i = 0; i < n; i++) { + String[] samples = generateSampleData(); + for (String sample : samples) { + testData.add(sample); + } + } + return testData; + } + + // simulate the visit (=visitor+id) + private String[] generateSampleData() { + + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < 19; i++) { + buf.append(Math.abs(rand1.nextInt()) % 10); + } + String header = buf.toString(); + + int size = Math.abs(rand3.nextInt()) % 9 + 1; + String[] samples = new String[size]; + for (int k = 0; k < size; k++) { + buf = new StringBuilder(header); + buf.append("-"); + for (int i = 0; i < 10; i++) { + buf.append(Math.abs(rand3.nextInt()) % 10); + } + samples[k] = buf.toString(); + } + + return samples; + } + + private double merge(int round) throws IOException { + int ln = 20; + int dn = 100 * (round + 1); + Set testSet = new HashSet(); + HyperLogLogPlusCounterNew[] hllcs = new HyperLogLogPlusCounterNew[ln]; + for (int i = 0; i < ln; i++) { + hllcs[i] = newHLLC(); + for (int k = 0; k < dn; k++) { + String[] samples = generateSampleData(); + for (String data : samples) { + testSet.add(data); + hllcs[i].add(Bytes.toBytes(data)); + } + } + } + HyperLogLogPlusCounterNew mergeHllc = newHLLC(); + for (HyperLogLogPlusCounterNew hllc : hllcs) { + mergeHllc.merge(hllc); + } + + double errorRate = mergeHllc.getErrorRate(); + long estimate = mergeHllc.getCountEstimate(); + double actualError = Math.abs((double) (testSet.size() - estimate) / testSet.size()); + + System.out.println(testSet.size() + "-" + estimate + " ~ " + actualError); + Assert.assertTrue(actualError < 0.1); + + if (actualError > errorRate) { + errorCount1++; + } + if (actualError > 2 * errorRate) { + errorCount2++; + } + if (actualError > 3 * errorRate) { + errorCount3++; + } + + return actualError; + } + + private HyperLogLogPlusCounterNew newHLLC() { + return new HyperLogLogPlusCounterNew(16); + } + + private void count(int n) throws IOException { + Set testSet = generateTestData(n); + + HyperLogLogPlusCounterNew hllc = newHLLC(); + for (String testData : testSet) { + hllc.add(Bytes.toBytes(testData)); + } + long estimate = hllc.getCountEstimate(); + double errorRate = hllc.getErrorRate(); + double actualError = (double) Math.abs(testSet.size() - estimate) / testSet.size(); + System.out.println(estimate); + System.out.println(testSet.size()); + System.out.println(errorRate); + System.out.println("=" + actualError); + Assert.assertTrue(actualError < errorRate * 3.0); + + checkSerialize(hllc); + } + + private void checkSerialize(HyperLogLogPlusCounterNew hllc) throws IOException { + long estimate = hllc.getCountEstimate(); + buf.clear(); + hllc.writeRegisters(buf); + buf.flip(); + hllc.readRegisters(buf); + Assert.assertEquals(estimate, hllc.getCountEstimate()); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/test/java/org/apache/kylin/measure/hll2/NewHyperLogLogBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/hll2/NewHyperLogLogBenchmarkTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/hll2/NewHyperLogLogBenchmarkTest.java new file mode 100644 index 0000000..bfb87f9 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/hll2/NewHyperLogLogBenchmarkTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.hll2; + +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterOld; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; +import org.apache.kylin.measure.hllc.RegisterType; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +/** + * Created by xiefan on 16-12-12. + */ +public class NewHyperLogLogBenchmarkTest { + + public static final Random rand = new Random(1); + + final int testTimes = 10000; + + @Test + public void denseToDenseRegisterMergeBenchmark() throws Exception { + final int p = 15; + int m = 1 << p; + + System.out.println("m : " + m); + double oldFactor = HyperLogLogPlusCounterNew.overflowFactor; + HyperLogLogPlusCounterNew.overflowFactor = 1.1; //keep sparse + for (int cardinality : getTestDataDivide(m)) { + final HyperLogLogPlusCounterOld oldCounter = new HyperLogLogPlusCounterOld(p); + final HyperLogLogPlusCounterOld oldCounter2 = getRandOldCounter(p, cardinality); + long oldTime = runTestCase(new TestCase() { + @Override + public void run() { + + for (int i = 0; i < testTimes; i++) { + oldCounter.merge(oldCounter2); + } + } + }); + final HyperLogLogPlusCounterNew newCounter = new HyperLogLogPlusCounterNew(p, RegisterType.DENSE); + final HyperLogLogPlusCounterNew newCounter2 = new HyperLogLogPlusCounterNew(p, RegisterType.DENSE); + for (int i = 0; i < testTimes; i++) + newCounter2.add(i); + long newTime = runTestCase(new TestCase() { + @Override + public void run() { + for (int i = 0; i < testTimes; i++) { + newCounter.merge(newCounter2); + } + } + }); + assertEquals(RegisterType.DENSE, newCounter.getRegisterType()); + assertEquals(RegisterType.DENSE, newCounter2.getRegisterType()); + System.out.println("----------------------------"); + System.out.println("cardinality : " + cardinality); + System.out.println("old time : " + oldTime); + System.out.println("new time : " + newTime); + } + HyperLogLogPlusCounterNew.overflowFactor = oldFactor; + } + + @Test + public void sparseToSparseMergeBenchmark() throws Exception { + final int p = 15; + int m = 1 << p; + System.out.println("m : " + m); + double oldFactor = HyperLogLogPlusCounterNew.overflowFactor; + HyperLogLogPlusCounterNew.overflowFactor = 1.1; //keep sparse + for (int cardinality : getTestDataDivide(m)) { + final HyperLogLogPlusCounterOld oldCounter = new HyperLogLogPlusCounterOld(p); + final HyperLogLogPlusCounterOld oldCounter2 = getRandOldCounter(p, cardinality); + long oldTime = runTestCase(new TestCase() { + @Override + public void run() { + + for (int i = 0; i < testTimes; i++) { + oldCounter.merge(oldCounter2); + } + } + }); + final HyperLogLogPlusCounterNew newCounter = new HyperLogLogPlusCounterNew(p); + final HyperLogLogPlusCounterNew newCounter2 = getRandNewCounter(p, cardinality); + long newTime = runTestCase(new TestCase() { + @Override + public void run() { + for (int i = 0; i < testTimes; i++) { + newCounter.merge(newCounter2); + } + } + }); + assertEquals(RegisterType.SPARSE, newCounter.getRegisterType()); + assertEquals(RegisterType.SPARSE, newCounter2.getRegisterType()); + System.out.println("----------------------------"); + System.out.println("cardinality : " + cardinality); + System.out.println("old time : " + oldTime); + System.out.println("new time : " + newTime); + } + HyperLogLogPlusCounterNew.overflowFactor = oldFactor; + } + + @Test + public void sparseToDenseRegisterMergeBenchmark() throws Exception { + final int p = 15; + int m = 1 << p; + System.out.println("m : " + m); + double oldFactor = HyperLogLogPlusCounterNew.overflowFactor; + HyperLogLogPlusCounterNew.overflowFactor = 1.1; //keep sparse + for (int cardinality : getTestDataDivide(m)) { + System.out.println("----------------------------"); + System.out.println("cardinality : " + cardinality); + final HyperLogLogPlusCounterOld oldCounter = new HyperLogLogPlusCounterOld(p); + final HyperLogLogPlusCounterOld oldCounter2 = getRandOldCounter(p, cardinality); + long oldTime = runTestCase(new TestCase() { + @Override + public void run() { + for (int i = 0; i < testTimes; i++) { + oldCounter.merge(oldCounter2); + } + } + }); + final HyperLogLogPlusCounterNew newCounter = new HyperLogLogPlusCounterNew(p, RegisterType.DENSE); + final HyperLogLogPlusCounterNew newCounter2 = getRandNewCounter(p, cardinality); + long newTime = runTestCase(new TestCase() { + @Override + public void run() { + for (int i = 0; i < testTimes; i++) { + newCounter.merge(newCounter2); + } + } + }); + assertEquals(RegisterType.DENSE, newCounter.getRegisterType()); + assertEquals(RegisterType.SPARSE, newCounter2.getRegisterType()); + System.out.println("old time : " + oldTime); + System.out.println("new time : " + newTime); + } + HyperLogLogPlusCounterNew.overflowFactor = oldFactor; + } + + @Test + public void sparseSerializeBenchmark() throws Exception { + final int p = 15; + int m = 1 << p; + double oldFactor = HyperLogLogPlusCounterNew.overflowFactor; + HyperLogLogPlusCounterNew.overflowFactor = 1.1; //keep sparse + for (int cardinality : getTestDataDivide(m)) { + System.out.println("----------------------------"); + System.out.println("cardinality : " + cardinality); + final HyperLogLogPlusCounterOld oldCounter = getRandOldCounter(p, cardinality); + long oldTime = runTestCase(new TestCase() { + @Override + public void run() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); + long totalBytes = 0; + for (int i = 0; i < testTimes; i++) { + buf.clear(); + oldCounter.writeRegisters(buf); + totalBytes += buf.position(); + buf.flip(); + oldCounter.readRegisters(buf); + } + System.out.println("old serialize bytes : " + totalBytes / testTimes + "B"); + } + }); + final HyperLogLogPlusCounterNew newCounter = getRandNewCounter(p, cardinality); + long newTime = runTestCase(new TestCase() { + @Override + public void run() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); + long totalBytes = 0; + for (int i = 0; i < testTimes; i++) { + buf.clear(); + newCounter.writeRegisters(buf); + totalBytes += buf.position(); + buf.flip(); + newCounter.readRegisters(buf); + } + System.out.println("new serialize bytes : " + totalBytes / testTimes + "B"); + } + }); + assertEquals(RegisterType.SPARSE, newCounter.getRegisterType()); + System.out.println("old serialize time : " + oldTime); + System.out.println("new serialize time : " + newTime); + } + HyperLogLogPlusCounterNew.overflowFactor = oldFactor; + } + + @Test + public void denseSerializeBenchmark() throws Exception { + final int p = 15; + int m = 1 << p; + double oldFactor = HyperLogLogPlusCounterNew.overflowFactor; + HyperLogLogPlusCounterNew.overflowFactor = 0; //keep sparse + for (int cardinality : getTestDataDivide(m)) { + System.out.println("----------------------------"); + System.out.println("cardinality : " + cardinality); + final HyperLogLogPlusCounterOld oldCounter = getRandOldCounter(p, cardinality); + long oldTime = runTestCase(new TestCase() { + @Override + public void run() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); + long totalBytes = 0; + for (int i = 0; i < testTimes; i++) { + buf.clear(); + oldCounter.writeRegisters(buf); + totalBytes += buf.position(); + buf.flip(); + oldCounter.readRegisters(buf); + } + System.out.println("old serialize bytes : " + totalBytes / testTimes + "B"); + } + }); + final HyperLogLogPlusCounterNew newCounter = getRandNewCounter(p, cardinality, RegisterType.DENSE); + long newTime = runTestCase(new TestCase() { + @Override + public void run() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); + long totalBytes = 0; + for (int i = 0; i < testTimes; i++) { + buf.clear(); + newCounter.writeRegisters(buf); + totalBytes += buf.position(); + buf.flip(); + newCounter.readRegisters(buf); + } + System.out.println("new serialize bytes : " + totalBytes / testTimes + "B"); + } + }); + assertEquals(RegisterType.DENSE, newCounter.getRegisterType()); + System.out.println("old serialize time : " + oldTime); + System.out.println("new serialize time : " + newTime); + } + HyperLogLogPlusCounterNew.overflowFactor = oldFactor; + } + + interface TestCase { + void run() throws Exception; + } + + public long runTestCase(TestCase testCase) throws Exception { + long startTime = System.currentTimeMillis(); + testCase.run(); + return System.currentTimeMillis() - startTime; + } + + public HyperLogLogPlusCounterOld getRandOldCounter(int p, int num) { + HyperLogLogPlusCounterOld c = new HyperLogLogPlusCounterOld(p); + for (int i = 0; i < num; i++) + c.add(i); + return c; + } + + public HyperLogLogPlusCounterNew getRandNewCounter(int p, int num) { + HyperLogLogPlusCounterNew c = new HyperLogLogPlusCounterNew(p); + for (int i = 0; i < num; i++) + c.add(i); + return c; + } + + public HyperLogLogPlusCounterNew getRandNewCounter(int p, int num, RegisterType type) { + HyperLogLogPlusCounterNew c = new HyperLogLogPlusCounterNew(p, type); + for (int i = 0; i < num; i++) + c.add(i); + return c; + } + + public static int[] getTestDataDivide(int m) { + return new int[] { 1, 5, 10, 100, m / 200, m / 100, m / 50, m / 20, m / 10, m }; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 21af1e6..5445491 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -53,7 +53,7 @@ import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.kv.RowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -76,7 +76,7 @@ public class CubeStatsReader { final int samplingPercentage; final int mapperNumberOfFirstBuild; // becomes meaningless after merge final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge - final Map cuboidRowEstimatesHLL; + final Map cuboidRowEstimatesHLL; final CuboidScheduler cuboidScheduler; public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { @@ -96,7 +96,7 @@ public class CubeStatsReader { int percentage = 100; int mapperNumber = 0; double mapperOverlapRatio = 0; - Map counterMap = Maps.newHashMap(); + Map counterMap = Maps.newHashMap(); LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf); BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf); @@ -108,7 +108,7 @@ public class CubeStatsReader { } else if (key.get() == -2) { mapperNumber = Bytes.toInt(value.getBytes()); } else if (key.get() > 0) { - HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(kylinConfig.getCubeStatsHLLPrecision()); + HyperLogLogPlusCounterNew hll = new HyperLogLogPlusCounterNew(kylinConfig.getCubeStatsHLLPrecision()); ByteArray byteArray = new ByteArray(value.getBytes()); hll.readRegisters(byteArray.asBuffer()); counterMap.put(key.get(), hll); @@ -161,9 +161,9 @@ public class CubeStatsReader { return mapperOverlapRatioOfFirstBuild; } - public static Map getCuboidRowCountMapFromSampling(Map hllcMap, int samplingPercentage) { + public static Map getCuboidRowCountMapFromSampling(Map hllcMap, int samplingPercentage) { Map cuboidRowCountMap = Maps.newHashMap(); - for (Map.Entry entry : hllcMap.entrySet()) { + for (Map.Entry entry : hllcMap.entrySet()) { // No need to adjust according sampling percentage. Assumption is that data set is far // more than cardinality. Even a percentage of the data should already see all cardinalities. cuboidRowCountMap.put(entry.getKey(), entry.getValue().getCountEstimate()); http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java index 74a2107..219cdf2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -33,17 +33,17 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; public class CubeStatsWriter { public static void writeCuboidStatistics(Configuration conf, Path outputPath, // - Map cuboidHLLMap, int samplingPercentage) throws IOException { + Map cuboidHLLMap, int samplingPercentage) throws IOException { writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0); } public static void writeCuboidStatistics(Configuration conf, Path outputPath, // - Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException { + Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); List allCuboids = new ArrayList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 776d750..0d388c7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -47,7 +47,7 @@ import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsWriter; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +64,7 @@ public class FactDistinctColumnsReducer extends KylinReducer columnList; private String statisticsOutput = null; private List baseCuboidRowCountInMappers; - protected Map cuboidHLLMap = null; + protected Map cuboidHLLMap = null; protected long baseCuboidId; protected CubeDesc cubeDesc; private long totalRowsBeforeMerge = 0; @@ -156,7 +156,7 @@ public class FactDistinctColumnsReducer extends KylinReducer extends FactDistinctColumnsMap protected CuboidScheduler cuboidScheduler = null; protected int nRowKey; private Integer[][] allCuboidsBitSet = null; - private HyperLogLogPlusCounter[] allCuboidsHLL = null; + private HyperLogLogPlusCounterNew[] allCuboidsHLL = null; private Long[] cuboidIds; private HashFunction hf = null; private int rowCount = 0; @@ -76,9 +76,9 @@ public class FactDistinctHiveColumnsMapper extends FactDistinctColumnsMap allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]); cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]); - allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length]; + allCuboidsHLL = new HyperLogLogPlusCounterNew[cuboidIds.length]; for (int i = 0; i < cuboidIds.length; i++) { - allCuboidsHLL[i] = new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()); + allCuboidsHLL[i] = new HyperLogLogPlusCounterNew(cubeDesc.getConfig().getCubeStatsHLLPrecision()); } hf = Hashing.murmur3_32(); @@ -207,7 +207,7 @@ public class FactDistinctHiveColumnsMapper extends FactDistinctColumnsMap if (collectStatistics) { ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); // output each cuboid's hll to reducer, key is 0 - cuboidId - HyperLogLogPlusCounter hll; + HyperLogLogPlusCounterNew hll; for (int i = 0; i < cuboidIds.length; i++) { hll = allCuboidsHLL[i]; http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java index 88f6ba2..e839989 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java @@ -47,7 +47,7 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +56,7 @@ import com.google.common.collect.Maps; public class MergeStatisticsStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(MergeStatisticsStep.class); - protected Map cuboidHLLMap = Maps.newHashMap(); + protected Map cuboidHLLMap = Maps.newHashMap(); public MergeStatisticsStep() { super(); @@ -100,7 +100,7 @@ public class MergeStatisticsStep extends AbstractExecutable { // sampling percentage; averageSamplingPercentage += Bytes.toInt(value.getBytes()); } else if (key.get() > 0) { - HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(kylinConf.getCubeStatsHLLPrecision()); + HyperLogLogPlusCounterNew hll = new HyperLogLogPlusCounterNew(kylinConf.getCubeStatsHLLPrecision()); ByteArray byteArray = new ByteArray(value.getBytes()); hll.readRegisters(byteArray.asBuffer()); http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java index 89d23fa..cae3b62 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.commons.lang.RandomStringUtils; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.junit.Before; import org.junit.Test; @@ -45,7 +45,7 @@ public class CubeSamplingTest { private Integer[][] allCuboidsBitSet; private HashFunction hf = null; private long baseCuboidId; - private HyperLogLogPlusCounter[] allCuboidsHLL = null; + private HyperLogLogPlusCounterNew[] allCuboidsHLL = null; private final byte[] seperator = Bytes.toBytes(","); @Before @@ -61,9 +61,9 @@ public class CubeSamplingTest { allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]); System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids."); - allCuboidsHLL = new HyperLogLogPlusCounter[allCuboids.size()]; + allCuboidsHLL = new HyperLogLogPlusCounterNew[allCuboids.size()]; for (int i = 0; i < allCuboids.size(); i++) { - allCuboidsHLL[i] = new HyperLogLogPlusCounter(14); + allCuboidsHLL[i] = new HyperLogLogPlusCounterNew(14); } // hf = Hashing.goodFastHash(32); http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java index ca8684f..a00db94 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.CubeStatsWriter; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.junit.Test; import com.google.common.collect.Maps; @@ -48,7 +48,7 @@ public class FactDistinctColumnsReducerTest { } System.out.println(outputPath); - Map cuboidHLLMap = Maps.newHashMap(); + Map cuboidHLLMap = Maps.newHashMap(); CubeStatsWriter.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100); FileSystem.getLocal(conf).delete(outputPath, true); http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 10c74f3..76212c8 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -83,7 +83,7 @@ import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; import org.apache.kylin.engine.spark.util.IteratorUtils; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -241,15 +241,15 @@ public class SparkCubing extends AbstractApplication { } } - private Map sampling(final JavaRDD> rowJavaRDD, final String cubeName, String segmentId) throws Exception { + private Map sampling(final JavaRDD> rowJavaRDD, final String cubeName, String segmentId) throws Exception { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); CubeDesc cubeDesc = cubeInstance.getDescriptor(); CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc); List allCuboidIds = cuboidScheduler.getAllCuboidIds(); - final HashMap zeroValue = Maps.newHashMap(); + final HashMap zeroValue = Maps.newHashMap(); for (Long id : allCuboidIds) { - zeroValue.put(id, new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); + zeroValue.put(id, new HyperLogLogPlusCounterNew(cubeDesc.getConfig().getCubeStatsHLLPrecision())); } CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); @@ -278,12 +278,12 @@ public class SparkCubing extends AbstractApplication { row_hashcodes[i] = new ByteArray(); } - final HashMap samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2, List, HashMap>() { + final HashMap samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2, List, HashMap>() { final HashFunction hashFunction = Hashing.murmur3_128(); @Override - public HashMap call(HashMap v1, List v2) throws Exception { + public HashMap call(HashMap v1, List v2) throws Exception { for (int i = 0; i < nRowKey; i++) { Hasher hc = hashFunction.newHasher(); String colValue = v2.get(rowKeyColumnIndexes[i]); @@ -296,7 +296,7 @@ public class SparkCubing extends AbstractApplication { for (Map.Entry entry : allCuboidsBitSet.entrySet()) { Hasher hc = hashFunction.newHasher(); - HyperLogLogPlusCounter counter = v1.get(entry.getKey()); + HyperLogLogPlusCounterNew counter = v1.get(entry.getKey()); final Integer[] cuboidBitSet = entry.getValue(); for (int position = 0; position < cuboidBitSet.length; position++) { hc.putBytes(row_hashcodes[cuboidBitSet[position]].array()); @@ -305,14 +305,14 @@ public class SparkCubing extends AbstractApplication { } return v1; } - }, new Function2, HashMap, HashMap>() { + }, new Function2, HashMap, HashMap>() { @Override - public HashMap call(HashMap v1, HashMap v2) throws Exception { + public HashMap call(HashMap v1, HashMap v2) throws Exception { Preconditions.checkArgument(v1.size() == v2.size()); Preconditions.checkArgument(v1.size() > 0); - for (Map.Entry entry : v1.entrySet()) { - final HyperLogLogPlusCounter counter1 = entry.getValue(); - final HyperLogLogPlusCounter counter2 = v2.get(entry.getKey()); + for (Map.Entry entry : v1.entrySet()) { + final HyperLogLogPlusCounterNew counter1 = entry.getValue(); + final HyperLogLogPlusCounterNew counter2 = v2.get(entry.getKey()); counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null")); } return v1; @@ -470,7 +470,7 @@ public class SparkCubing extends AbstractApplication { ClassUtil.addClasspath(confPath); } - private byte[][] createHTable(String cubeName, String segmentId, Map samplingResult) throws Exception { + private byte[][] createHTable(String cubeName, String segmentId, Map samplingResult) throws Exception { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); @@ -614,7 +614,7 @@ public class SparkCubing extends AbstractApplication { } }); - final Map samplingResult = sampling(rowJavaRDD, cubeName, segmentId); + final Map samplingResult = sampling(rowJavaRDD, cubeName, segmentId); final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult); final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys); http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index 06a07ca..230249f 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -35,18 +35,18 @@ import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; /** * @author Jack - * + * */ public class ColumnCardinalityMapper extends KylinMapper { - private Map hllcMap = new HashMap(); + private Map hllcMap = new HashMap(); public static final String DEFAULT_DELIM = ","; private int counter = 0; @@ -87,9 +87,9 @@ public class ColumnCardinalityMapper extends KylinMapper extends KylinMapper { public static final int ONE = 1; - private Map hllcMap = new HashMap(); + private Map hllcMap = new HashMap(); @Override protected void setup(Context context) throws IOException { @@ -53,16 +53,16 @@ public class ColumnCardinalityReducer extends KylinReducer