Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9EB26200D41 for ; Tue, 7 Nov 2017 17:54:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9DC7C160C03; Tue, 7 Nov 2017 16:54:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B623A160C08 for ; Tue, 7 Nov 2017 17:54:20 +0100 (CET) Received: (qmail 39859 invoked by uid 500); 7 Nov 2017 16:54:19 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 38815 invoked by uid 99); 7 Nov 2017 16:54:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Nov 2017 16:54:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8723DDF9AE; Tue, 7 Nov 2017 16:54:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Tue, 07 Nov 2017 16:54:42 -0000 Message-Id: In-Reply-To: <3df2cdcdb808479dba94f89e78aa2670@git.apache.org> References: <3df2cdcdb808479dba94f89e78aa2670@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/50] [abbrv] beam git commit: [BEAM-2728] Extension for sketch-based statistics : HyperLogLog archived-at: Tue, 07 Nov 2017 16:54:23 -0000 [BEAM-2728] Extension for sketch-based statistics : HyperLogLog Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fd58a423 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fd58a423 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fd58a423 Branch: refs/heads/mr-runner Commit: fd58a423099b5aea5cd78c862e81c6a03bbf6521 Parents: 4f4632c Author: Arnaud Fournier Authored: Thu Jul 20 16:57:38 2017 +0200 Committer: Eugene Kirpichov Committed: Fri Nov 3 15:14:57 2017 -0700 ---------------------------------------------------------------------- pom.xml | 6 + sdks/java/extensions/pom.xml | 1 + sdks/java/extensions/sketching/pom.xml | 104 ++++ .../sketching/ApproximateDistinct.java | 573 +++++++++++++++++++ .../sdk/extensions/sketching/package-info.java | 22 + .../sketching/ApproximateDistinctTest.java | 209 +++++++ sdks/java/javadoc/pom.xml | 5 + 7 files changed, 920 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b2ab5d7..baed9ba 100644 --- a/pom.xml +++ b/pom.xml @@ -502,6 +502,12 @@ org.apache.beam + beam-sdks-java-extensions-sketching + ${project.version} + + + + org.apache.beam beam-sdks-java-extensions-sorter ${project.version} http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml index ec6efb6..5e8d495 100644 --- a/sdks/java/extensions/pom.xml +++ b/sdks/java/extensions/pom.xml @@ -36,6 +36,7 @@ jackson join-library protobuf + sketching sorter sql http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sketching/pom.xml b/sdks/java/extensions/sketching/pom.xml new file mode 100755 index 0000000..f0538ae --- /dev/null +++ b/sdks/java/extensions/sketching/pom.xml @@ -0,0 +1,104 @@ + + + + + 4.0.0 + + + org.apache.beam + beam-sdks-java-extensions-parent + 2.3.0-SNAPSHOT + ../pom.xml + + + beam-sdks-java-extensions-sketching + Apache Beam :: SDKs :: Java :: Extensions :: Sketching + + + 2.9.5 + + + + + org.apache.beam + beam-sdks-java-core + + + + com.clearspring.analytics + stream + ${streamlib.version} + + + + org.slf4j + slf4j-api + + + + com.google.guava + guava + + + + com.google.auto.value + auto-value + provided + + + + + + org.apache.avro + avro + test + + + + org.apache.beam + beam-sdks-java-core + tests + test + + + + org.apache.commons + commons-lang3 + test + + + + org.apache.beam + beam-runners-direct-java + test + + + + org.hamcrest + hamcrest-all + test + + + + junit + junit + test + + + + http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java new file mode 100644 index 0000000..1da0cc3 --- /dev/null +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sketching; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import com.google.auto.value.AutoValue; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link PTransform}s for computing the approximate number of distinct elements in a stream. + * + *

This class relies on the HyperLogLog algorithm, and more precisely HyperLogLog+, the improved + * version of Google. + * + *

References

+ * + *

The implementation comes from Addthis' + * Stream-lib library.
+ * The original paper of the HyperLogLog is available here.
+ * A paper from the same authors to have a clearer view of the algorithm is available here.
+ * Google's HyperLogLog+ version is detailed in this paper. + * + *

Parameters

+ * + *

Two parameters can be tuned in order to control the computation's accuracy: + * + *

    + *
  • Precision: {@code p}
    + * Controls the accuracy of the estimation. The precision value will have an impact on the + * number of buckets used to store information about the distinct elements.
    + * In general one can expect a relative error of about {@code 1.1 / sqrt(2^p)}. The value + * should be of at least 4 to guarantee a minimal accuracy.
    + * By default, the precision is set to {@code 12} for a relative error of around {@code 2%}. + *
  • Sparse Precision: {@code sp}
    + * Used to create a sparse representation in order to optimize memory and improve accuracy at + * small cardinalities.
    + * The value of {@code sp} should be greater than {@code p}, but lower than 32.
    + * By default, the sparse representation is not used ({@code sp = 0}). One should use it if + * the cardinality may be less than {@code 12000}. + *
+ * + *

Examples

+ * + *

There are 2 ways of using this class: + * + *

    + *
  • Use the {@link PTransform}s that return {@code PCollection} corresponding to the + * estimate number of distinct elements in the input {@link PCollection} of objects or for + * each key in a {@link PCollection} of {@link KV}s. + *
  • Use the {@link ApproximateDistinctFn} {@code CombineFn} that is exposed in order to make + * advanced processing involving the {@link HyperLogLogPlus} structure which resumes the + * stream. + *
+ * + *

Using the Transforms

+ * + *

Example 1: globally default use

+ * + *
{@code
+ * PCollection input = ...;
+ * PCollection hllSketch = input.apply(ApproximateDistinct.globally());
+ * }
+ * + *

Example 2: per key default use

+ * + *
{@code
+ * PCollection input = ...;
+ * PCollection hllSketches = input.apply(ApproximateDistinct
+ *                .perKey());
+ * }
+ * + *

Example 3: tune precision and use sparse representation

+ * + *

One can tune the precision and sparse precision parameters in order to control the accuracy + * and the memory. The tuning works exactly the same for {@link #globally()} and {@link #perKey()}. + * + *

{@code
+ * int precision = 15;
+ * int sparsePrecision = 25;
+ * PCollection input = ...;
+ * PCollection hllSketch = input.apply(ApproximateDistinct
+ *                .globally()
+ *                .withPrecision(precision)
+ *                .withSparsePrecision(sparsePrecision));
+ * }
+ * + *

Using the {@link ApproximateDistinctFn} CombineFn

+ * + *

The CombineFn does the same thing as the transform but it can be used in cases where you want + * to manipulate the {@link HyperLogLogPlus} sketch, for example if you want to store it in a + * database to have a backup. It can also be used in stateful processing or in {@link + * org.apache.beam.sdk.transforms.CombineFns.ComposedCombineFn}. + * + *

Example 1: basic use

+ * + *

This example is not really interesting but show how you can properly create an {@link + * ApproximateDistinctFn}. One must always specify a coder using the {@link + * ApproximateDistinctFn#create(Coder)} method. + * + *

{@code
+ * PCollection input = ...;
+ * PCollection output = input.apply(Combine.globally(ApproximateDistinctFn
+ *                 .create(BigEndianIntegerCoder.of()));
+ * }
+ * + *

Example 2: use the {@link CombineFn} in a stateful {@link ParDo}

+ * + *

One may want to use the {@link ApproximateDistinctFn} in a stateful ParDo in order to make + * some processing depending on the current cardinality of the stream.
+ * For more information about stateful processing see the blog spot on this topic here. + * + *

Here is an example of {@link DoFn} using an {@link ApproximateDistinctFn} as a {@link + * org.apache.beam.sdk.state.CombiningState}: + * + *


+ * {@literal class StatefulCardinality extends DoFn} {
+ *   {@literal @StateId}("hyperloglog")
+ *   {@literal private final StateSpec>}
+ *      indexSpec;
+ *
+ *   {@literal public StatefulCardinality(ApproximateDistinctFn fn)} {
+ *     indexSpec = StateSpecs.combining(fn);
+ *   }
+ *
+ *  {@literal @ProcessElement}
+ *   public void processElement(
+ *      ProcessContext context,
+ *      {@literal @StateId}("hllSketch")
+ *      {@literal CombiningState hllSketch)} {
+ *     long current = MoreObjects.firstNonNull(hllSketch.getAccum().cardinality(), 0L);
+ *     hllSketch.add(context.element());
+ *     context.output(...);
+ *   }
+ * }
+ * 
+ * + *

Then the {@link DoFn} can be called like this: + * + *

{@code
+ * PCollection input = ...;
+ * ApproximateDistinctFn myFn = ApproximateDistinctFn.create(input.getCoder());
+ * PCollection = input.apply(ParDo.of(new StatefulCardinality<>(myFn)));
+ * }
+ * + *

Example 3: use the {@link RetrieveCardinality} utility class

+ * + *

One may want to retrieve the cardinality as a long after making some advanced processing using + * the {@link HyperLogLogPlus} structure.
+ * The {@link RetrieveCardinality} utility class provides an easy way to do so: + * + *

{@code
+ * PCollection input = ...;
+ * PCollection hll = input.apply(Combine.globally(ApproximateDistinctFn
+ *                  .create(new MyObjectCoder())
+ *                  .withSparseRepresentation(20)));
+ *
+ *  // Some advanced processing
+ *  PCollection advancedResult = hll.apply(...);
+ *
+ *  PCollection cardinality = hll.apply(ApproximateDistinct.RetrieveCardinality.globally());
+ *
+ * }
+ * + *

Warning: this class is experimental. Its API is subject to change in future versions of + * Beam. For example, it may be merged with the {@link + * org.apache.beam.sdk.transforms.ApproximateUnique} transform. + */ +@Experimental +public final class ApproximateDistinct { + + /** + * Computes the approximate number of distinct elements in the input {@code PCollection} + * and returns a {@code PCollection}. + * + * @param the type of the elements in the input {@link PCollection} + */ + public static GloballyDistinct globally() { + return GloballyDistinct.builder().build(); + } + + /** + * Like {@link #globally} but per key, i.e computes the approximate number of distinct values per + * key in a {@code PCollection>} and returns {@code PCollection>}. + * + * @param type of the keys mapping the elements + * @param type of the values being combined per key + */ + public static PerKeyDistinct perKey() { + return PerKeyDistinct.builder().build(); + } + + /** + * Implementation of {@link #globally()}. + * + * @param + */ + @AutoValue + public abstract static class GloballyDistinct + extends PTransform, PCollection> { + + abstract int precision(); + + abstract int sparsePrecision(); + + abstract Builder toBuilder(); + + static Builder builder() { + return new AutoValue_ApproximateDistinct_GloballyDistinct.Builder() + .setPrecision(12) + .setSparsePrecision(0); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setPrecision(int p); + + abstract Builder setSparsePrecision(int sp); + + abstract GloballyDistinct build(); + } + + public GloballyDistinct withPrecision(int p) { + return toBuilder().setPrecision(p).build(); + } + + public GloballyDistinct withSparsePrecision(int sp) { + return toBuilder().setSparsePrecision(sp).build(); + } + + @Override + public PCollection expand(PCollection input) { + return input + .apply( + "Compute HyperLogLog Structure", + Combine.globally( + ApproximateDistinctFn.create(input.getCoder()) + .withPrecision(this.precision()) + .withSparseRepresentation(this.sparsePrecision()))) + .apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.globally())); + } + } + + /** + * Implementation of {@link #perKey()}. + * + * @param + * @param + */ + @AutoValue + public abstract static class PerKeyDistinct + extends PTransform>, PCollection>> { + + abstract int precision(); + + abstract int sparsePrecision(); + + abstract Builder toBuilder(); + + static Builder builder() { + return new AutoValue_ApproximateDistinct_PerKeyDistinct.Builder() + .setPrecision(12) + .setSparsePrecision(0); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setPrecision(int p); + + abstract Builder setSparsePrecision(int sp); + + abstract PerKeyDistinct build(); + } + + public PerKeyDistinct withPrecision(int p) { + return toBuilder().setPrecision(p).build(); + } + + public PerKeyDistinct withSparsePrecision(int sp) { + return toBuilder().setSparsePrecision(sp).build(); + } + + @Override + public PCollection> expand(PCollection> input) { + KvCoder inputCoder = (KvCoder) input.getCoder(); + return input + .apply( + Combine.perKey( + ApproximateDistinctFn.create(inputCoder.getValueCoder()) + .withPrecision(this.precision()) + .withSparseRepresentation(this.sparsePrecision()))) + .apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.perKey())); + } + } + + /** + * Implements the {@link CombineFn} of {@link ApproximateDistinct} transforms. + * + * @param the type of the elements in the input {@link PCollection} + */ + public static class ApproximateDistinctFn + extends CombineFn { + + private final int p; + + private final int sp; + + private final Coder inputCoder; + + private ApproximateDistinctFn(int p, int sp, Coder coder) { + this.p = p; + this.sp = sp; + inputCoder = coder; + } + + /** + * Returns an {@link ApproximateDistinctFn} combiner with the given input coder. + * + * @param coder the coder that encodes the elements' type + */ + public static ApproximateDistinctFn create(Coder coder) { + try { + coder.verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + throw new IllegalArgumentException("Coder is not deterministic ! " + e.getMessage(), e); + } + return new ApproximateDistinctFn<>(12, 0, coder); + } + + /** + * Returns a new {@link ApproximateDistinctFn} combiner with a new precision {@code p}. + * + *

Keep in mind that {@code p} cannot be lower than 4, because the estimation would be too + * inaccurate. + * + *

See {@link ApproximateDistinct#precisionForRelativeError(double)} and {@link + * ApproximateDistinct#relativeErrorForPrecision(int)} to have more information about the + * relationship between precision and relative error. + * + * @param p the precision value for the normal representation + */ + public ApproximateDistinctFn withPrecision(int p) { + checkArgument(p >= 4, "Expected: p >= 4. Actual: p = %s", p); + return new ApproximateDistinctFn<>(p, this.sp, this.inputCoder); + } + + /** + * Returns a new {@link ApproximateDistinctFn} combiner with a sparse representation of + * precision {@code sp}. + * + *

Values above 32 are not yet supported by the AddThis version of HyperLogLog+. + * + *

Fore more information about the sparse representation, read Google's paper available here. + * + * @param sp the precision of HyperLogLog+' sparse representation + */ + public ApproximateDistinctFn withSparseRepresentation(int sp) { + checkArgument( + (sp > this.p && sp < 32) || (sp == 0), + "Expected: p <= sp <= 32." + "Actual: p = %s, sp = %s", + this.p, + sp); + return new ApproximateDistinctFn<>(this.p, sp, this.inputCoder); + } + + @Override + public HyperLogLogPlus createAccumulator() { + return new HyperLogLogPlus(p, sp); + } + + @Override + public HyperLogLogPlus addInput(HyperLogLogPlus acc, InputT record) { + try { + acc.offer(CoderUtils.encodeToByteArray(inputCoder, record)); + } catch (CoderException e) { + throw new IllegalStateException("The input value cannot be encoded: " + e.getMessage(), e); + } + return acc; + } + + /** Output the whole structure so it can be queried, reused or stored easily. */ + @Override + public HyperLogLogPlus extractOutput(HyperLogLogPlus accumulator) { + return accumulator; + } + + @Override + public HyperLogLogPlus mergeAccumulators(Iterable accumulators) { + HyperLogLogPlus mergedAccum = createAccumulator(); + for (HyperLogLogPlus accum : accumulators) { + try { + mergedAccum.addAll(accum); + } catch (CardinalityMergeException e) { + // Should never happen because only HyperLogLogPlus accumulators are instantiated. + throw new IllegalStateException( + "The accumulators cannot be merged: " + e.getMessage(), e); + } + } + return mergedAccum; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("p", p).withLabel("precision")) + .add(DisplayData.item("sp", sp).withLabel("sparse representation precision")); + } + } + + /** Coder for {@link HyperLogLogPlus} class. */ + public static class HyperLogLogPlusCoder extends CustomCoder { + + private static final HyperLogLogPlusCoder INSTANCE = new HyperLogLogPlusCoder(); + + private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of(); + + public static HyperLogLogPlusCoder of() { + return INSTANCE; + } + + @Override + public void encode(HyperLogLogPlus value, OutputStream outStream) throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null HyperLogLogPlus sketch"); + } + BYTE_ARRAY_CODER.encode(value.getBytes(), outStream); + } + + @Override + public HyperLogLogPlus decode(InputStream inStream) throws IOException { + return HyperLogLogPlus.Builder.build(BYTE_ARRAY_CODER.decode(inStream)); + } + + @Override + public boolean isRegisterByteSizeObserverCheap(HyperLogLogPlus value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(HyperLogLogPlus value) throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null HyperLogLogPlus sketch"); + } + return value.sizeof(); + } + } + + /** + * Utility class that provides {@link DoFn}s to retrieve the cardinality from a {@link + * HyperLogLogPlus} structure in a global or perKey context. + */ + public static class RetrieveCardinality { + + public static DoFn, KV> perKey() { + return new DoFn, KV>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV kv = c.element(); + c.output(KV.of(kv.getKey(), kv.getValue().cardinality())); + } + }; + } + + public static DoFn globally() { + return new DoFn() { + @ProcessElement + public void apply(ProcessContext c) { + c.output(c.element().cardinality()); + } + }; + } + } + + /** + * Computes the precision based on the desired relative error. + * + *

According to the paper, the mean squared error is bounded by the following formula: + * + *

b(m) / sqrt(m)
+   * Where m is the number of buckets used ({@code p = log2(m)})
+   * and {@code b(m) < 1.106} for {@code m > 16 (and p > 4)}.
+   * 
+ * + *
+ * WARNING:
+ * This does not mean relative error in the estimation can't be higher.
+ * This only means that on average the relative error will be lower than the desired relative + * error.
+ * Nevertheless, the more elements arrive in the {@link PCollection}, the lower the variation will + * be.
+ * Indeed, this is like when you throw a dice millions of time: the relative frequency of each + * different result {1,2,3,4,5,6} will get closer to {@code 1/6}. + * + * @param relativeError the mean squared error should be in the interval ]0,1] + * @return the minimum precision p in order to have the desired relative error on average. + */ + public static long precisionForRelativeError(double relativeError) { + return Math.round( + Math.ceil(Math.log(Math.pow(1.106, 2.0) / Math.pow(relativeError, 2.0)) / Math.log(2))); + } + + /** + * @param p the precision i.e. the number of bits used for indexing the buckets + * @return the Mean squared error of the Estimation of cardinality to expect for the given value + * of p. + */ + public static double relativeErrorForPrecision(int p) { + if (p < 4) { + return 1.0; + } + double betaM; + switch (p) { + case 4: + betaM = 1.156; + break; + case 5: + betaM = 1.2; + break; + case 6: + betaM = 1.104; + break; + case 7: + betaM = 1.096; + break; + default: + betaM = 1.05; + break; + } + return betaM / Math.sqrt(Math.exp(p * Math.log(2))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java new file mode 100755 index 0000000..2e8d60e --- /dev/null +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utilities for computing statistical indicators using probabilistic sketches. + */ +package org.apache.beam.sdk.extensions.sketching; http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java new file mode 100644 index 0000000..cdbcc45 --- /dev/null +++ b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sketching; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.extensions.sketching.ApproximateDistinct.ApproximateDistinctFn; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Tests for {@link ApproximateDistinct}. */ +@RunWith(JUnit4.class) +public class ApproximateDistinctTest implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(ApproximateDistinctTest.class); + + @Rule public final transient TestPipeline tp = TestPipeline.create(); + + @Test + public void smallCardinality() { + final int smallCard = 1000; + final int p = 6; + final double expectedErr = 1.104 / Math.sqrt(p); + + List small = new ArrayList<>(); + for (int i = 0; i < smallCard; i++) { + small.add(i); + } + + PCollection cardinality = + tp.apply("small stream", Create.of(small)) + .apply("small cardinality", ApproximateDistinct.globally().withPrecision(p)); + + PAssert.that("Not Accurate Enough", cardinality) + .satisfies(new VerifyAccuracy(smallCard, expectedErr)); + + tp.run(); + } + + @Test + public void bigCardinality() { + final int cardinality = 15000; + final int p = 15; + final int sp = 20; + final double expectedErr = 1.04 / Math.sqrt(p); + + List stream = new ArrayList<>(); + for (int i = 1; i <= cardinality; i++) { + stream.addAll(Collections.nCopies(2, i)); + } + Collections.shuffle(stream); + + PCollection res = + tp.apply("big stream", Create.of(stream)) + .apply( + "big cardinality", + ApproximateDistinct.globally().withPrecision(p).withSparsePrecision(sp)); + + PAssert.that("Verify Accuracy for big cardinality", res) + .satisfies(new VerifyAccuracy(cardinality, expectedErr)); + + tp.run(); + } + + @Test + public void perKey() { + final int cardinality = 1000; + final int p = 15; + final double expectedErr = 1.04 / Math.sqrt(p); + + List stream = new ArrayList<>(); + for (int i = 1; i <= cardinality; i++) { + stream.addAll(Collections.nCopies(2, i)); + } + Collections.shuffle(stream); + + PCollection results = + tp.apply("per key stream", Create.of(stream)) + .apply("create keys", WithKeys.of(1)) + .apply( + "per key cardinality", + ApproximateDistinct.perKey().withPrecision(p)) + .apply("extract values", Values.create()); + + PAssert.that("Verify Accuracy for cardinality per key", results) + .satisfies(new VerifyAccuracy(cardinality, expectedErr)); + + tp.run(); + } + + @Test + public void customObject() { + final int cardinality = 500; + final int p = 15; + final double expectedErr = 1.04 / Math.sqrt(p); + + Schema schema = + SchemaBuilder.record("User") + .fields() + .requiredString("Pseudo") + .requiredInt("Age") + .endRecord(); + List users = new ArrayList<>(); + for (int i = 1; i <= cardinality; i++) { + GenericData.Record newRecord = new GenericData.Record(schema); + newRecord.put("Pseudo", "User" + i); + newRecord.put("Age", i); + users.add(newRecord); + } + PCollection results = + tp.apply("Create stream", Create.of(users).withCoder(AvroCoder.of(schema))) + .apply( + "Test custom object", + ApproximateDistinct.globally().withPrecision(p)); + + PAssert.that("Verify Accuracy for custom object", results) + .satisfies(new VerifyAccuracy(cardinality, expectedErr)); + + tp.run(); + } + + @Test + public void testCoder() throws Exception { + HyperLogLogPlus hllp = new HyperLogLogPlus(12, 18); + for (int i = 0; i < 10; i++) { + hllp.offer(i); + } + CoderProperties.coderDecodeEncodeEqual( + ApproximateDistinct.HyperLogLogPlusCoder.of(), hllp); + } + + @Test + public void testDisplayData() { + final ApproximateDistinctFn fnWithPrecision = + ApproximateDistinctFn.create(BigEndianIntegerCoder.of()).withPrecision(23); + + assertThat(DisplayData.from(fnWithPrecision), hasDisplayItem("p", 23)); + assertThat(DisplayData.from(fnWithPrecision), hasDisplayItem("sp", 0)); + } + + class VerifyAccuracy implements SerializableFunction, Void> { + + private final int expectedCard; + + private final double expectedError; + + VerifyAccuracy(int expectedCard, double expectedError) { + this.expectedCard = expectedCard; + this.expectedError = expectedError; + } + + @Override + public Void apply(Iterable input) { + for (Long estimate : input) { + boolean isAccurate = Math.abs(estimate - expectedCard) / expectedCard < expectedError; + Assert.assertTrue( + "not accurate enough : \nExpected Cardinality : " + + expectedCard + + "\nComputed Cardinality : " + + estimate, + isAccurate); + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/javadoc/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml index 79ac933..85440ff 100644 --- a/sdks/java/javadoc/pom.xml +++ b/sdks/java/javadoc/pom.xml @@ -94,6 +94,11 @@ org.apache.beam + beam-sdks-java-extensions-sketching + + + + org.apache.beam beam-sdks-java-extensions-sorter