beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [26/50] [abbrv] beam git commit: [BEAM-2728] Extension for sketch-based statistics : HyperLogLog
Date Tue, 07 Nov 2017 16:54:42 GMT
[BEAM-2728] Extension for sketch-based statistics : HyperLogLog


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fd58a423
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fd58a423
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fd58a423

Branch: refs/heads/mr-runner
Commit: fd58a423099b5aea5cd78c862e81c6a03bbf6521
Parents: 4f4632c
Author: Arnaud Fournier <arnaudfournier921@gmail.com>
Authored: Thu Jul 20 16:57:38 2017 +0200
Committer: Eugene Kirpichov <ekirpichov@gmail.com>
Committed: Fri Nov 3 15:14:57 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   6 +
 sdks/java/extensions/pom.xml                    |   1 +
 sdks/java/extensions/sketching/pom.xml          | 104 ++++
 .../sketching/ApproximateDistinct.java          | 573 +++++++++++++++++++
 .../sdk/extensions/sketching/package-info.java  |  22 +
 .../sketching/ApproximateDistinctTest.java      | 209 +++++++
 sdks/java/javadoc/pom.xml                       |   5 +
 7 files changed, 920 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b2ab5d7..baed9ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -502,6 +502,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-extensions-sketching</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-extensions-sorter</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index ec6efb6..5e8d495 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -36,6 +36,7 @@
     <module>jackson</module>
     <module>join-library</module>
     <module>protobuf</module>
+    <module>sketching</module>
     <module>sorter</module>
     <module>sql</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sketching/pom.xml b/sdks/java/extensions/sketching/pom.xml
new file mode 100755
index 0000000..f0538ae
--- /dev/null
+++ b/sdks/java/extensions/sketching/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-extensions-parent</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-extensions-sketching</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Extensions :: Sketching</name>
+
+  <properties>
+    <streamlib.version>2.9.5</streamlib.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.clearspring.analytics</groupId>
+      <artifactId>stream</artifactId>
+      <version>${streamlib.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
new file mode 100644
index 0000000..1da0cc3
--- /dev/null
+++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sketching;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.google.auto.value.AutoValue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link PTransform}s for computing the approximate number of distinct elements in a stream.
+ *
+ * <p>This class relies on the HyperLogLog algorithm, and more precisely HyperLogLog+,
the improved
+ * version of Google.
+ *
+ * <h2>References</h2>
+ *
+ * <p>The implementation comes from <a href="https://github.com/addthis/stream-lib">Addthis'
+ * Stream-lib library</a>. <br>
+ * The original paper of the HyperLogLog is available <a
+ * href="http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf">here</a>. <br>
+ * A paper from the same authors to have a clearer view of the algorithm is available <a
+ * href="http://cscubs.cs.uni-bonn.de/2016/proceedings/paper-03.pdf">here</a>. <br>
+ * Google's HyperLogLog+ version is detailed in <a
+ * href="https://research.google.com/pubs/pub40671.html">this paper</a>.
+ *
+ * <h2>Parameters</h2>
+ *
+ * <p>Two parameters can be tuned in order to control the computation's accuracy:
+ *
+ * <ul>
+ *   <li><b>Precision: {@code p}</b> <br>
+ *       Controls the accuracy of the estimation. The precision value will have an impact
on the
+ *       number of buckets used to store information about the distinct elements. <br>
+ *       In general one can expect a relative error of about {@code 1.1 / sqrt(2^p)}. The
value
+ *       should be of at least 4 to guarantee a minimal accuracy. <br>
+ *       By default, the precision is set to {@code 12} for a relative error of around {@code
2%}.
+ *   <li><b>Sparse Precision: {@code sp}</b> <br>
+ *       Used to create a sparse representation in order to optimize memory and improve accuracy
at
+ *       small cardinalities. <br>
+ *       The value of {@code sp} should be greater than {@code p}, but lower than 32. <br>
+ *       By default, the sparse representation is not used ({@code sp = 0}). One should use
it if
+ *       the cardinality may be less than {@code 12000}.
+ * </ul>
+ *
+ * <h2>Examples</h2>
+ *
+ * <p>There are 2 ways of using this class:
+ *
+ * <ul>
+ *   <li>Use the {@link PTransform}s that return {@code PCollection<Long>} corresponding
to the
+ *       estimate number of distinct elements in the input {@link PCollection} of objects
or for
+ *       each key in a {@link PCollection} of {@link KV}s.
+ *   <li>Use the {@link ApproximateDistinctFn} {@code CombineFn} that is exposed in
order to make
+ *       advanced processing involving the {@link HyperLogLogPlus} structure which resumes
the
+ *       stream.
+ * </ul>
+ *
+ * <h3>Using the Transforms</h3>
+ *
+ * <h4>Example 1: globally default use</h4>
+ *
+ * <pre>{@code
+ * PCollection<Integer> input = ...;
+ * PCollection<Long> hllSketch = input.apply(ApproximateDistinct.<Integer>globally());
+ * }</pre>
+ *
+ * <h4>Example 2: per key default use</h4>
+ *
+ * <pre>{@code
+ * PCollection<Integer, String> input = ...;
+ * PCollection<Integer, Long> hllSketches = input.apply(ApproximateDistinct
+ *                .<Integer, String>perKey());
+ * }</pre>
+ *
+ * <h4>Example 3: tune precision and use sparse representation</h4>
+ *
+ * <p>One can tune the precision and sparse precision parameters in order to control
the accuracy
+ * and the memory. The tuning works exactly the same for {@link #globally()} and {@link #perKey()}.
+ *
+ * <pre>{@code
+ * int precision = 15;
+ * int sparsePrecision = 25;
+ * PCollection<Double> input = ...;
+ * PCollection<Long> hllSketch = input.apply(ApproximateDistinct
+ *                .<Double>globally()
+ *                .withPrecision(precision)
+ *                .withSparsePrecision(sparsePrecision));
+ * }</pre>
+ *
+ * <h3>Using the {@link ApproximateDistinctFn} CombineFn</h3>
+ *
+ * <p>The CombineFn does the same thing as the transform but it can be used in cases
where you want
+ * to manipulate the {@link HyperLogLogPlus} sketch, for example if you want to store it
in a
+ * database to have a backup. It can also be used in stateful processing or in {@link
+ * org.apache.beam.sdk.transforms.CombineFns.ComposedCombineFn}.
+ *
+ * <h4>Example 1: basic use</h4>
+ *
+ * <p>This example is not really interesting but show how you can properly create an
{@link
+ * ApproximateDistinctFn}. One must always specify a coder using the {@link
+ * ApproximateDistinctFn#create(Coder)} method.
+ *
+ * <pre>{@code
+ * PCollection<Integer> input = ...;
+ * PCollection<HyperLogLogPlus> output = input.apply(Combine.globally(ApproximateDistinctFn
+ *                 .<Integer>create(BigEndianIntegerCoder.of()));
+ * }</pre>
+ *
+ * <h4>Example 2: use the {@link CombineFn} in a stateful {@link ParDo}</h4>
+ *
+ * <p>One may want to use the {@link ApproximateDistinctFn} in a stateful ParDo in
order to make
+ * some processing depending on the current cardinality of the stream. <br>
+ * For more information about stateful processing see the blog spot on this topic <a
+ * href="https://beam.apache.org/blog/2017/02/13/stateful-processing.html">here</a>.
+ *
+ * <p>Here is an example of {@link DoFn} using an {@link ApproximateDistinctFn} as
a {@link
+ * org.apache.beam.sdk.state.CombiningState}:
+ *
+ * <pre><code>
+ * {@literal class StatefulCardinality<V> extends DoFn<V, OutputT>} {
+ *   {@literal @StateId}("hyperloglog")
+ *   {@literal private final StateSpec<CombiningState<V, HyperLogLogPlus, HyperLogLogPlus>>}
+ *      indexSpec;
+ *
+ *   {@literal public StatefulCardinality(ApproximateDistinctFn<V> fn)} {
+ *     indexSpec = StateSpecs.combining(fn);
+ *   }
+ *
+ *  {@literal @ProcessElement}
+ *   public void processElement(
+ *      ProcessContext context,
+ *      {@literal @StateId}("hllSketch")
+ *      {@literal CombiningState<V, HyperLogLogPlus, HyperLogLogPlus> hllSketch)} {
+ *     long current = MoreObjects.firstNonNull(hllSketch.getAccum().cardinality(), 0L);
+ *     hllSketch.add(context.element());
+ *     context.output(...);
+ *   }
+ * }
+ * </code></pre>
+ *
+ * <p>Then the {@link DoFn} can be called like this:
+ *
+ * <pre>{@code
+ * PCollection<V> input = ...;
+ * ApproximateDistinctFn<V> myFn = ApproximateDistinctFn.create(input.getCoder());
+ * PCollection<V> = input.apply(ParDo.of(new StatefulCardinality<>(myFn)));
+ * }</pre>
+ *
+ * <h4>Example 3: use the {@link RetrieveCardinality} utility class</h4>
+ *
+ * <p>One may want to retrieve the cardinality as a long after making some advanced
processing using
+ * the {@link HyperLogLogPlus} structure. <br>
+ * The {@link RetrieveCardinality} utility class provides an easy way to do so:
+ *
+ * <pre>{@code
+ * PCollection<MyObject> input = ...;
+ * PCollection<HyperLogLogPlus> hll = input.apply(Combine.globally(ApproximateDistinctFn
+ *                  .<MyObject>create(new MyObjectCoder())
+ *                  .withSparseRepresentation(20)));
+ *
+ *  // Some advanced processing
+ *  PCollection<SomeObject> advancedResult = hll.apply(...);
+ *
+ *  PCollection<Long> cardinality = hll.apply(ApproximateDistinct.RetrieveCardinality.globally());
+ *
+ * }</pre>
+ *
+ * <p><b>Warning: this class is experimental.</b> Its API is subject to
change in future versions of
+ * Beam. For example, it may be merged with the {@link
+ * org.apache.beam.sdk.transforms.ApproximateUnique} transform.
+ */
+@Experimental
+public final class ApproximateDistinct {
+
+  /**
+   * Computes the approximate number of distinct elements in the input {@code PCollection<InputT>}
+   * and returns a {@code PCollection<Long>}.
+   *
+   * @param <InputT> the type of the elements in the input {@link PCollection}
+   */
+  public static <InputT> GloballyDistinct<InputT> globally() {
+    return GloballyDistinct.<InputT>builder().build();
+  }
+
+  /**
+   * Like {@link #globally} but per key, i.e computes the approximate number of distinct
values per
+   * key in a {@code PCollection<KV<K, V>>} and returns {@code PCollection<KV<K,
Long>>}.
+   *
+   * @param <K> type of the keys mapping the elements
+   * @param <V> type of the values being combined per key
+   */
+  public static <K, V> PerKeyDistinct<K, V> perKey() {
+    return PerKeyDistinct.<K, V>builder().build();
+  }
+
+  /**
+   * Implementation of {@link #globally()}.
+   *
+   * @param <InputT>
+   */
+  @AutoValue
+  public abstract static class GloballyDistinct<InputT>
+      extends PTransform<PCollection<InputT>, PCollection<Long>> {
+
+    abstract int precision();
+
+    abstract int sparsePrecision();
+
+    abstract Builder<InputT> toBuilder();
+
+    static <InputT> Builder<InputT> builder() {
+      return new AutoValue_ApproximateDistinct_GloballyDistinct.Builder<InputT>()
+          .setPrecision(12)
+          .setSparsePrecision(0);
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<InputT> {
+      abstract Builder<InputT> setPrecision(int p);
+
+      abstract Builder<InputT> setSparsePrecision(int sp);
+
+      abstract GloballyDistinct<InputT> build();
+    }
+
+    public GloballyDistinct<InputT> withPrecision(int p) {
+      return toBuilder().setPrecision(p).build();
+    }
+
+    public GloballyDistinct<InputT> withSparsePrecision(int sp) {
+      return toBuilder().setSparsePrecision(sp).build();
+    }
+
+    @Override
+    public PCollection<Long> expand(PCollection<InputT> input) {
+      return input
+          .apply(
+              "Compute HyperLogLog Structure",
+              Combine.globally(
+                  ApproximateDistinctFn.<InputT>create(input.getCoder())
+                      .withPrecision(this.precision())
+                      .withSparseRepresentation(this.sparsePrecision())))
+          .apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.globally()));
+    }
+  }
+
+  /**
+   * Implementation of {@link #perKey()}.
+   *
+   * @param <K>
+   * @param <V>
+   */
+  @AutoValue
+  public abstract static class PerKeyDistinct<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>>
{
+
+    abstract int precision();
+
+    abstract int sparsePrecision();
+
+    abstract Builder<K, V> toBuilder();
+
+    static <K, V> Builder<K, V> builder() {
+      return new AutoValue_ApproximateDistinct_PerKeyDistinct.Builder<K, V>()
+          .setPrecision(12)
+          .setSparsePrecision(0);
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract Builder<K, V> setPrecision(int p);
+
+      abstract Builder<K, V> setSparsePrecision(int sp);
+
+      abstract PerKeyDistinct<K, V> build();
+    }
+
+    public PerKeyDistinct<K, V> withPrecision(int p) {
+      return toBuilder().setPrecision(p).build();
+    }
+
+    public PerKeyDistinct<K, V> withSparsePrecision(int sp) {
+      return toBuilder().setSparsePrecision(sp).build();
+    }
+
+    @Override
+    public PCollection<KV<K, Long>> expand(PCollection<KV<K, V>>
input) {
+      KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
+      return input
+          .apply(
+              Combine.<K, V, HyperLogLogPlus>perKey(
+                  ApproximateDistinctFn.<V>create(inputCoder.getValueCoder())
+                      .withPrecision(this.precision())
+                      .withSparseRepresentation(this.sparsePrecision())))
+          .apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.<K>perKey()));
+    }
+  }
+
+  /**
+   * Implements the {@link CombineFn} of {@link ApproximateDistinct} transforms.
+   *
+   * @param <InputT> the type of the elements in the input {@link PCollection}
+   */
+  public static class ApproximateDistinctFn<InputT>
+      extends CombineFn<InputT, HyperLogLogPlus, HyperLogLogPlus> {
+
+    private final int p;
+
+    private final int sp;
+
+    private final Coder<InputT> inputCoder;
+
+    private ApproximateDistinctFn(int p, int sp, Coder<InputT> coder) {
+      this.p = p;
+      this.sp = sp;
+      inputCoder = coder;
+    }
+
+    /**
+     * Returns an {@link ApproximateDistinctFn} combiner with the given input coder.
+     *
+     * @param coder the coder that encodes the elements' type
+     */
+    public static <InputT> ApproximateDistinctFn<InputT> create(Coder<InputT>
coder) {
+      try {
+        coder.verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        throw new IllegalArgumentException("Coder is not deterministic ! " + e.getMessage(),
e);
+      }
+      return new ApproximateDistinctFn<>(12, 0, coder);
+    }
+
+    /**
+     * Returns a new {@link ApproximateDistinctFn} combiner with a new precision {@code p}.
+     *
+     * <p>Keep in mind that {@code p} cannot be lower than 4, because the estimation
would be too
+     * inaccurate.
+     *
+     * <p>See {@link ApproximateDistinct#precisionForRelativeError(double)} and {@link
+     * ApproximateDistinct#relativeErrorForPrecision(int)} to have more information about
the
+     * relationship between precision and relative error.
+     *
+     * @param p the precision value for the normal representation
+     */
+    public ApproximateDistinctFn<InputT> withPrecision(int p) {
+      checkArgument(p >= 4, "Expected: p >= 4. Actual: p = %s", p);
+      return new ApproximateDistinctFn<>(p, this.sp, this.inputCoder);
+    }
+
+    /**
+     * Returns a new {@link ApproximateDistinctFn} combiner with a sparse representation
of
+     * precision {@code sp}.
+     *
+     * <p>Values above 32 are not yet supported by the AddThis version of HyperLogLog+.
+     *
+     * <p>Fore more information about the sparse representation, read Google's paper
available <a
+     * href="https://research.google.com/pubs/pub40671.html">here</a>.
+     *
+     * @param sp the precision of HyperLogLog+' sparse representation
+     */
+    public ApproximateDistinctFn<InputT> withSparseRepresentation(int sp) {
+      checkArgument(
+          (sp > this.p && sp < 32) || (sp == 0),
+          "Expected: p <= sp <= 32." + "Actual: p = %s, sp = %s",
+          this.p,
+          sp);
+      return new ApproximateDistinctFn<>(this.p, sp, this.inputCoder);
+    }
+
+    @Override
+    public HyperLogLogPlus createAccumulator() {
+      return new HyperLogLogPlus(p, sp);
+    }
+
+    @Override
+    public HyperLogLogPlus addInput(HyperLogLogPlus acc, InputT record) {
+      try {
+        acc.offer(CoderUtils.encodeToByteArray(inputCoder, record));
+      } catch (CoderException e) {
+        throw new IllegalStateException("The input value cannot be encoded: " + e.getMessage(),
e);
+      }
+      return acc;
+    }
+
+    /** Output the whole structure so it can be queried, reused or stored easily. */
+    @Override
+    public HyperLogLogPlus extractOutput(HyperLogLogPlus accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public HyperLogLogPlus mergeAccumulators(Iterable<HyperLogLogPlus> accumulators)
{
+      HyperLogLogPlus mergedAccum = createAccumulator();
+      for (HyperLogLogPlus accum : accumulators) {
+        try {
+          mergedAccum.addAll(accum);
+        } catch (CardinalityMergeException e) {
+          // Should never happen because only HyperLogLogPlus accumulators are instantiated.
+          throw new IllegalStateException(
+              "The accumulators cannot be merged: " + e.getMessage(), e);
+        }
+      }
+      return mergedAccum;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(DisplayData.item("p", p).withLabel("precision"))
+          .add(DisplayData.item("sp", sp).withLabel("sparse representation precision"));
+    }
+  }
+
+  /** Coder for {@link HyperLogLogPlus} class. */
+  public static class HyperLogLogPlusCoder extends CustomCoder<HyperLogLogPlus> {
+
+    private static final HyperLogLogPlusCoder INSTANCE = new HyperLogLogPlusCoder();
+
+    private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+
+    public static HyperLogLogPlusCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(HyperLogLogPlus value, OutputStream outStream) throws IOException
{
+      if (value == null) {
+        throw new CoderException("cannot encode a null HyperLogLogPlus sketch");
+      }
+      BYTE_ARRAY_CODER.encode(value.getBytes(), outStream);
+    }
+
+    @Override
+    public HyperLogLogPlus decode(InputStream inStream) throws IOException {
+      return HyperLogLogPlus.Builder.build(BYTE_ARRAY_CODER.decode(inStream));
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(HyperLogLogPlus value) {
+      return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(HyperLogLogPlus value) throws IOException {
+      if (value == null) {
+        throw new CoderException("cannot encode a null HyperLogLogPlus sketch");
+      }
+      return value.sizeof();
+    }
+  }
+
+  /**
+   * Utility class that provides {@link DoFn}s to retrieve the cardinality from a {@link
+   * HyperLogLogPlus} structure in a global or perKey context.
+   */
+  public static class RetrieveCardinality {
+
+    public static <K> DoFn<KV<K, HyperLogLogPlus>, KV<K, Long>> perKey()
{
+      return new DoFn<KV<K, HyperLogLogPlus>, KV<K, Long>>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          KV<K, HyperLogLogPlus> kv = c.element();
+          c.output(KV.of(kv.getKey(), kv.getValue().cardinality()));
+        }
+      };
+    }
+
+    public static DoFn<HyperLogLogPlus, Long> globally() {
+      return new DoFn<HyperLogLogPlus, Long>() {
+        @ProcessElement
+        public void apply(ProcessContext c) {
+          c.output(c.element().cardinality());
+        }
+      };
+    }
+  }
+
+  /**
+   * Computes the precision based on the desired relative error.
+   *
+   * <p>According to the paper, the mean squared error is bounded by the following
formula:
+   *
+   * <pre>b(m) / sqrt(m)
+   * Where m is the number of buckets used ({@code p = log2(m)})
+   * and {@code b(m) < 1.106} for {@code m > 16 (and p > 4)}.
+   * </pre>
+   *
+   * <br>
+   * <b>WARNING:</b> <br>
+   * This does not mean relative error in the estimation <b>can't</b> be higher.
<br>
+   * This only means that on average the relative error will be lower than the desired relative
+   * error. <br>
+   * Nevertheless, the more elements arrive in the {@link PCollection}, the lower the variation
will
+   * be. <br>
+   * Indeed, this is like when you throw a dice millions of time: the relative frequency
of each
+   * different result <code>{1,2,3,4,5,6}</code> will get closer to {@code 1/6}.
+   *
+   * @param relativeError the mean squared error should be in the interval ]0,1]
+   * @return the minimum precision p in order to have the desired relative error on average.
+   */
+  public static long precisionForRelativeError(double relativeError) {
+    return Math.round(
+        Math.ceil(Math.log(Math.pow(1.106, 2.0) / Math.pow(relativeError, 2.0)) / Math.log(2)));
+  }
+
+  /**
+   * @param p the precision i.e. the number of bits used for indexing the buckets
+   * @return the Mean squared error of the Estimation of cardinality to expect for the given
value
+   *     of p.
+   */
+  public static double relativeErrorForPrecision(int p) {
+    if (p < 4) {
+      return 1.0;
+    }
+    double betaM;
+    switch (p) {
+      case 4:
+        betaM = 1.156;
+        break;
+      case 5:
+        betaM = 1.2;
+        break;
+      case 6:
+        betaM = 1.104;
+        break;
+      case 7:
+        betaM = 1.096;
+        break;
+      default:
+        betaM = 1.05;
+        break;
+    }
+    return betaM / Math.sqrt(Math.exp(p * Math.log(2)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java
b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java
new file mode 100755
index 0000000..2e8d60e
--- /dev/null
+++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities for computing statistical indicators using probabilistic sketches.
+ */
+package org.apache.beam.sdk.extensions.sketching;

http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java
b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java
new file mode 100644
index 0000000..cdbcc45
--- /dev/null
+++ b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sketching;
+
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.extensions.sketching.ApproximateDistinct.ApproximateDistinctFn;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link ApproximateDistinct}. */
+@RunWith(JUnit4.class)
+public class ApproximateDistinctTest implements Serializable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ApproximateDistinctTest.class);
+
+  @Rule public final transient TestPipeline tp = TestPipeline.create();
+
+  @Test
+  public void smallCardinality() {
+    final int smallCard = 1000;
+    final int p = 6;
+    final double expectedErr = 1.104 / Math.sqrt(p);
+
+    List<Integer> small = new ArrayList<>();
+    for (int i = 0; i < smallCard; i++) {
+      small.add(i);
+    }
+
+    PCollection<Long> cardinality =
+        tp.apply("small stream", Create.<Integer>of(small))
+            .apply("small cardinality", ApproximateDistinct.<Integer>globally().withPrecision(p));
+
+    PAssert.that("Not Accurate Enough", cardinality)
+        .satisfies(new VerifyAccuracy(smallCard, expectedErr));
+
+    tp.run();
+  }
+
+  @Test
+  public void bigCardinality() {
+    final int cardinality = 15000;
+    final int p = 15;
+    final int sp = 20;
+    final double expectedErr = 1.04 / Math.sqrt(p);
+
+    List<Integer> stream = new ArrayList<>();
+    for (int i = 1; i <= cardinality; i++) {
+      stream.addAll(Collections.nCopies(2, i));
+    }
+    Collections.shuffle(stream);
+
+    PCollection<Long> res =
+        tp.apply("big stream", Create.<Integer>of(stream))
+            .apply(
+                "big cardinality",
+                ApproximateDistinct.<Integer>globally().withPrecision(p).withSparsePrecision(sp));
+
+    PAssert.that("Verify Accuracy for big cardinality", res)
+        .satisfies(new VerifyAccuracy(cardinality, expectedErr));
+
+    tp.run();
+  }
+
+  @Test
+  public void perKey() {
+    final int cardinality = 1000;
+    final int p = 15;
+    final double expectedErr = 1.04 / Math.sqrt(p);
+
+    List<Integer> stream = new ArrayList<>();
+    for (int i = 1; i <= cardinality; i++) {
+      stream.addAll(Collections.nCopies(2, i));
+    }
+    Collections.shuffle(stream);
+
+    PCollection<Long> results =
+        tp.apply("per key stream", Create.of(stream))
+            .apply("create keys", WithKeys.<Integer, Integer>of(1))
+            .apply(
+                "per key cardinality",
+                ApproximateDistinct.<Integer, Integer>perKey().withPrecision(p))
+            .apply("extract values", Values.<Long>create());
+
+    PAssert.that("Verify Accuracy for cardinality per key", results)
+        .satisfies(new VerifyAccuracy(cardinality, expectedErr));
+
+    tp.run();
+  }
+
+  @Test
+  public void customObject() {
+    final int cardinality = 500;
+    final int p = 15;
+    final double expectedErr = 1.04 / Math.sqrt(p);
+
+    Schema schema =
+        SchemaBuilder.record("User")
+            .fields()
+            .requiredString("Pseudo")
+            .requiredInt("Age")
+            .endRecord();
+    List<GenericRecord> users = new ArrayList<>();
+    for (int i = 1; i <= cardinality; i++) {
+      GenericData.Record newRecord = new GenericData.Record(schema);
+      newRecord.put("Pseudo", "User" + i);
+      newRecord.put("Age", i);
+      users.add(newRecord);
+    }
+    PCollection<Long> results =
+        tp.apply("Create stream", Create.of(users).withCoder(AvroCoder.of(schema)))
+            .apply(
+                "Test custom object",
+                ApproximateDistinct.<GenericRecord>globally().withPrecision(p));
+
+    PAssert.that("Verify Accuracy for custom object", results)
+        .satisfies(new VerifyAccuracy(cardinality, expectedErr));
+
+    tp.run();
+  }
+
+  @Test
+  public void testCoder() throws Exception {
+    HyperLogLogPlus hllp = new HyperLogLogPlus(12, 18);
+    for (int i = 0; i < 10; i++) {
+      hllp.offer(i);
+    }
+    CoderProperties.<HyperLogLogPlus>coderDecodeEncodeEqual(
+        ApproximateDistinct.HyperLogLogPlusCoder.of(), hllp);
+  }
+
+  @Test
+  public void testDisplayData() {
+    final ApproximateDistinctFn<Integer> fnWithPrecision =
+        ApproximateDistinctFn.create(BigEndianIntegerCoder.of()).withPrecision(23);
+
+    assertThat(DisplayData.from(fnWithPrecision), hasDisplayItem("p", 23));
+    assertThat(DisplayData.from(fnWithPrecision), hasDisplayItem("sp", 0));
+  }
+
+  class VerifyAccuracy implements SerializableFunction<Iterable<Long>, Void>
{
+
+    private final int expectedCard;
+
+    private final double expectedError;
+
+    VerifyAccuracy(int expectedCard, double expectedError) {
+      this.expectedCard = expectedCard;
+      this.expectedError = expectedError;
+    }
+
+    @Override
+    public Void apply(Iterable<Long> input) {
+      for (Long estimate : input) {
+        boolean isAccurate = Math.abs(estimate - expectedCard) / expectedCard < expectedError;
+        Assert.assertTrue(
+            "not accurate enough : \nExpected Cardinality : "
+                + expectedCard
+                + "\nComputed Cardinality : "
+                + estimate,
+            isAccurate);
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index 79ac933..85440ff 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -94,6 +94,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-sketching</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-extensions-sorter</artifactId>
     </dependency>
 


Mime
View raw message