[ https://issues.apache.org/jira/browse/FLINK7465?page=com.atlassian.jira.plugin.system.issuetabpanels:commenttabpanel&focusedCommentId=16179304#comment16179304
]
ASF GitHub Bot commented on FLINK7465:

Github user jparkie commented on a diff in the pull request:
https://github.com/apache/flink/pull/4652#discussion_r140833539
 Diff: flinklibraries/flinktable/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java

@@ 0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Java implementation of HyperLogLog (HLL) algorithm from this paper:
+ * <p/>
+ * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
+ * <p/>
+ * HLL is an improved version of LogLog that is capable of estimating
+ * the cardinality of a set with accuracy = 1.04/sqrt(m) where
+ * m = 2^b. So we can control accuracy vs space usage by increasing
+ * or decreasing b.
+ * <p/>
+ * The main benefit of using HLL over LL is that it only requires 64%
+ * of the space that LL does to get the same accuracy.
+ * <p/>
+ * <p>
+ * Note that this implementation does not include the long range correction function
+ * defined in the original paper. Empirical evidence shows that the correction
+ * function causes more harm than good.
+ * </p>
+ */
+public class HyperLogLog implements ICardinality, Serializable {
+
+ private final RegisterSet registerSet;
+ private final int log2m;
+ private final double alphaMM;
+
+
+ /**
+ * Create a new HyperLogLog instance using the specified standard deviation.
+ *
+ * @param rsd  the relative standard deviation for the counter.
+ * smaller values create counters that require more space.
+ */
+ public HyperLogLog(double rsd) {
+ this(log2m(rsd));
+ }
+
+ private static int log2m(double rsd) {
+ return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2));
+ }
+
+ private static double rsd(int log2m) {
+ return 1.106 / Math.sqrt(Math.exp(log2m * Math.log(2)));
+ }
+
+ private static void validateLog2m(int log2m) {
+ if (log2m < 0  log2m > 30) {
+ throw new IllegalArgumentException("log2m argument is "
+ + log2m + " and is outside the range [0, 30]");
+ }
+ }
+
+ private static double linearCounting(int m, double v) {
+ return m * Math.log(m / v);
+ }
+
+ /**
+ * Create a new HyperLogLog instance. The log2m parameter defines the accuracy of
+ * the counter. The larger the log2m the better the accuracy.
+ * <p/>
+ * accuracy = 1.04/sqrt(2^log2m)
+ *
+ * @param log2m  the number of bits to use as the basis for the HLL instance
+ */
+ public HyperLogLog(int log2m) {
+ this(log2m, new RegisterSet(1 << log2m));
+ }
+
+ /**
+ * Creates a new HyperLogLog instance using the given registers. Used for unmarshalling
a serialized
+ * instance and for merging multiple counters together.
+ *
+ * @param registerSet  the initial values for the register set
+ */
+ public HyperLogLog(int log2m, RegisterSet registerSet) {
+ validateLog2m(log2m);
+ this.registerSet = registerSet;
+ this.log2m = log2m;
+ int m = 1 << this.log2m;
+
+ alphaMM = getAlphaMM(log2m, m);
+ }
+
+ @Override
+ public boolean offerHashed(long hashedValue) {
+ // j becomes the binary address determined by the first b log2m of x
+ // j will be between 0 and 2^log2m
+ final int j = (int) (hashedValue >>> (Long.SIZE  log2m));
+ final int r = Long.numberOfLeadingZeros((hashedValue << this.log2m)  (1 <<
(this.log2m  1)) + 1) + 1;
+ return registerSet.updateIfGreater(j, r);
+ }
+
+ @Override
+ public boolean offerHashed(int hashedValue) {
+ // j becomes the binary address determined by the first b log2m of x
+ // j will be between 0 and 2^log2m
+ final int j = hashedValue >>> (Integer.SIZE  log2m);
+ final int r = Integer.numberOfLeadingZeros((hashedValue << this.log2m)  (1 <<
(this.log2m  1)) + 1) + 1;
+ return registerSet.updateIfGreater(j, r);
+ }
+
+ @Override
+ public boolean offer(Object o) {
+ final int x = MurmurHash.hash(o);
+ return offerHashed(x);
+ }
+
+ @Override
+ public long cardinality() {
+ double registerSum = 0;
+ int count = registerSet.count;
+ double zeros = 0.0;
+ for (int j = 0; j < registerSet.count; j++) {
+ int val = registerSet.get(j);
+ registerSum += 1.0 / (1 << val);
+ if (val == 0) {
+ zeros++;
+ }
+ }
+
+ double estimate = alphaMM * (1 / registerSum);
+
+ if (estimate <= (5.0 / 2.0) * count) {
+ // Small Range Estimate
+ return Math.round(linearCounting(count, zeros));
+ } else {
+ return Math.round(estimate);
+ }
+ }
+
+ @Override
+ public int sizeof() {
+ return registerSet.size * 4;
+ }
+
+ @Override
+ public byte[] getBytes() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(baos);
+ writeBytes(dos);
+
+ return baos.toByteArray();
+ }
+
+ private void writeBytes(DataOutput serializedByteStream) throws IOException {
+ serializedByteStream.writeInt(log2m);
+ serializedByteStream.writeInt(registerSet.size * 4);
+ for (int x : registerSet.readOnlyBits()) {
+ serializedByteStream.writeInt(x);
+ }
+ }
+
+ /**
+ * Add all the elements of the other set to this set.
+ * <p/>
+ * This operation does not imply a loss of precision.
+ *
+ * @param other A compatible Hyperloglog instance (same log2m)
+ * @throws Exception if other is not compatible
+ */
+ public void addAll(HyperLogLog other) throws Exception {
+ if (this.sizeof() != other.sizeof()) {
+ throw new Exception("Cannot merge estimators of different sizes");
+ }
+
+ registerSet.merge(other.registerSet);
+ }
+
+ @Override
+ public ICardinality merge(ICardinality... estimators) throws Exception {
+ HyperLogLog merged = new HyperLogLog(log2m, new RegisterSet(this.registerSet.count));
+ merged.addAll(this);
+
+ if (estimators == null) {
+ return merged;
+ }
+
+ for (ICardinality estimator : estimators) {
+ if (!(estimator instanceof HyperLogLog)) {
+ throw new Exception("Cannot merge estimators of different class");
+ }
+ HyperLogLog hll = (HyperLogLog) estimator;
+ merged.addAll(hll);
+ }
+
+ return merged;
+ }
+
+ private Object writeReplace() {
+ return new SerializationHolder(this);
+ }
+
+ /**
+ * This class exists to support Externalizable semantics for
+ * HyperLogLog objects without having to expose a public
+ * constructor, public write/read methods, or pretend final
+ * fields aren't final.
+ *
+ * <p>
+ * In short, Externalizable allows you to skip some of the more
+ * verbose metadata default Serializable gets you, but still
+ * includes the class name. In that sense, there is some cost
+ * to this holder object because it has a longer class name. I
+ * imagine people who care about optimizing for that have their
+ * own workaround for long class names in general, or just use
+ * a custom serialization framework. Therefore we make no attempt
+ * to optimize that here (eg. by raising this from an inner class
+ * and giving it an unhelpful name).
+ * </p>
+ */
+ private static class SerializationHolder implements Externalizable {
+
+ HyperLogLog hyperLogLogHolder;
+
+ public SerializationHolder(HyperLogLog hyperLogLogHolder) {
+ this.hyperLogLogHolder = hyperLogLogHolder;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ hyperLogLogHolder.writeBytes(out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+ hyperLogLogHolder = Builder.build(in);
+ }
+
+ private Object readResolve() {
+ return hyperLogLogHolder;
+ }
+ }
+
+ /**
+ * Build a HyperLogLog instance.
+ */
+ public static class Builder implements Serializable {
 End diff 
Do we really need a builder when there are only one modifiable parameter? Why pay for
an instance of a Builder when you can have normal static factory methods? We can argue the
JIT may make it irrelevant, but it's hard to be definitive about the JIT.
> Add buildin BloomFilterCount on TableAPI&SQL
> 
>
> Key: FLINK7465
> URL: https://issues.apache.org/jira/browse/FLINK7465
> Project: Flink
> Issue Type: Subtask
> Components: Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also be k different
hash functions defined, each of which maps or hashes some set element to one of the m array
positions, generating a uniform random distribution. Typically, k is a constant, much smaller
than m, which is proportional to the number of elements to be added; the precise choice of
k and the constant of proportionality of m are determined by the intended false positive rate
of the filter.
> To add an element, feed it to each of the k hash functions to get k array positions.
Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of the k hash
functions to get k array positions. If any of the bits at these positions is 0, the element
is definitely not in the set – if it were, then all the bits would have been set to 1 when
it was inserted. If all are 1, then either the element is in the set, or the bits have by
chance been set to 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored arrows show
the positions in the bit array that each set element is mapped to. The element w is not in
the set {x, y, z}, because it hashes to one bitarray position containing 0. For this figure,
m = 18 and k = 3. The sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. https://github.com/apache/hive/blob/master/storageapi/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :)

This message was sent by Atlassian JIRA
(v6.4.14#64029)
