spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [1/2] spark git commit: [SPARK-9741] [SQL] Approximate Count Distinct using the new UDAF interface.
Date Wed, 30 Sep 2015 17:13:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master 2931e89f0 -> 16fd2a2f4


http://git-wip-us.apache.org/repos/asf/spark/blob/16fd2a2f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
new file mode 100644
index 0000000..ecc0644
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.util.Random
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, MutableRow, BoundReference}
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+import scala.collection.mutable
+import org.scalatest.Assertions._
+
+class HyperLogLogPlusPlusSuite extends SparkFunSuite {
+
+  /** Create a HLL++ instance and an input and output buffer. */
+  def createEstimator(rsd: Double, dt: DataType = IntegerType):
+      (HyperLogLogPlusPlus, MutableRow, MutableRow) = {
+    val input = new SpecificMutableRow(Seq(dt))
+    val hll = new HyperLogLogPlusPlus(new BoundReference(0, dt, true), rsd)
+    val buffer = createBuffer(hll)
+    (hll, input, buffer)
+  }
+
+  def createBuffer(hll: HyperLogLogPlusPlus): MutableRow = {
+    val buffer = new SpecificMutableRow(hll.bufferAttributes.map(_.dataType))
+    hll.initialize(buffer)
+    buffer
+  }
+
+  /** Evaluate the estimate. It should be within 3*SD's of the given true rsd. */
+  def evaluateEstimate(hll: HyperLogLogPlusPlus, buffer: MutableRow, cardinality: Int): Unit
= {
+    val estimate = hll.eval(buffer).asInstanceOf[Long].toDouble
+    val error = math.abs((estimate / cardinality.toDouble) - 1.0d)
+    assert(error < hll.trueRsd * 3.0d, "Error should be within 3 std. errors.")
+  }
+
+  test("add nulls") {
+    val (hll, input, buffer) = createEstimator(0.05)
+    input.setNullAt(0)
+    hll.update(buffer, input)
+    hll.update(buffer, input)
+    val estimate = hll.eval(buffer).asInstanceOf[Long]
+    assert(estimate == 0L, "Nothing meaningful added; estimate should be 0.")
+  }
+
+  def testCardinalityEstimates(
+      rsds: Seq[Double],
+      ns: Seq[Int],
+      f: Int => Int,
+      c: Int => Int): Unit = {
+    rsds.flatMap(rsd => ns.map(n => (rsd, n))).foreach {
+      case (rsd, n) =>
+        val (hll, input, buffer) = createEstimator(rsd)
+        var i = 0
+        while (i < n) {
+          input.setInt(0, f(i))
+          hll.update(buffer, input)
+          i += 1
+        }
+        val estimate = hll.eval(buffer).asInstanceOf[Long].toDouble
+        val cardinality = c(n)
+        val error = math.abs((estimate / cardinality.toDouble) - 1.0d)
+        assert(error < hll.trueRsd * 3.0d, "Error should be within 3 std. errors.")
+    }
+  }
+
+  test("deterministic cardinality estimation") {
+    val repeats = 10
+    testCardinalityEstimates(
+      Seq(0.1, 0.05, 0.025, 0.01),
+      Seq(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000).map(_ * repeats),
+      i => i / repeats,
+      i => i / repeats)
+  }
+
+  test("random cardinality estimation") {
+    val srng = new Random(323981238L)
+    val seen = mutable.HashSet.empty[Int]
+    val update = (i: Int) => {
+      val value = srng.nextInt()
+      seen += value
+      value
+    }
+    val eval = (n: Int) => {
+      val cardinality = seen.size
+      seen.clear()
+      cardinality
+    }
+    testCardinalityEstimates(
+      Seq(0.05, 0.01),
+      Seq(100, 10000, 500000),
+      update,
+      eval)
+  }
+
+  // Test merging
+  test("merging HLL instances") {
+    val (hll, input, buffer1a) = createEstimator(0.05)
+    val buffer1b = createBuffer(hll)
+    val buffer2 = createBuffer(hll)
+
+    // Create the
+    // Add the lower half
+    var i = 0
+    while (i < 500000) {
+      input.setInt(0, i)
+      hll.update(buffer1a, input)
+      i += 1
+    }
+
+    // Add the upper half
+    i = 500000
+    while (i < 1000000) {
+      input.setInt(0, i)
+      hll.update(buffer1b, input)
+      i += 1
+    }
+
+    // Merge the lower and upper halfs.
+    hll.merge(buffer1a, buffer1b)
+
+    // Create the other buffer in reverse
+    i = 999999
+    while (i >= 0) {
+      input.setInt(0, i)
+      hll.update(buffer2, input)
+      i -= 1
+    }
+
+    // Check if the buffers are equal.
+    assert(buffer2 == buffer1a, "Buffers should be equal")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message