hivemall-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From takuti <...@git.apache.org>
Subject [GitHub] incubator-hivemall pull request #125: [HIVEMALL-18] approx_distinct_count UD...
Date Fri, 24 Nov 2017 07:08:21 GMT
Github user takuti commented on a diff in the pull request:

    https://github.com/apache/incubator-hivemall/pull/125#discussion_r152906912
  
    --- Diff: core/src/main/java/hivemall/sketch/hll/ApproxCountDistinctUDAF.java ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package hivemall.sketch.hll;
    +
    +import hivemall.UDAFEvaluatorWithOptions;
    +import hivemall.utils.hadoop.HiveUtils;
    +import hivemall.utils.lang.Preconditions;
    +import hivemall.utils.lang.Primitives;
    +
    +import java.io.IOException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Options;
    +import org.apache.hadoop.hive.ql.exec.Description;
    +import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    +import org.apache.hadoop.hive.ql.metadata.HiveException;
    +import org.apache.hadoop.hive.ql.parse.SemanticException;
    +import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
    +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
    +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer;
    +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationType;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.io.LongWritable;
    +
    +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
    +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
    +
    +@Description(name = "approx_count_distinct", value = "_FUNC_(expr x [, const string options])"
    +        + " - Returns an approximation of count(DISTINCT x) using HyperLogLogPlus algorithm")
    +public final class ApproxCountDistinctUDAF extends AbstractGenericUDAFResolver {
    +
    +    @Override
    +    public GenericUDAFEvaluator getEvaluator(@Nonnull TypeInfo[] typeInfo)
    +            throws SemanticException {
    +        if (typeInfo.length != 1 && typeInfo.length != 2) {
    +            throw new UDFArgumentTypeException(typeInfo.length - 1,
    +                "_FUNC_ takes one or two arguments");
    +        }
    +        if (typeInfo.length == 2 && !HiveUtils.isStringTypeInfo(typeInfo[1]))
{
    +            throw new UDFArgumentTypeException(1,
    +                "The second argument type expected to be const string: " + typeInfo[1]);
    +        }
    +
    +        return new HLLEvaluator();
    +    }
    +
    +    public static final class HLLEvaluator extends UDAFEvaluatorWithOptions {
    +
    +        @Nullable
    +        private int[] params;
    +
    +        private ObjectInspector origInputOI;
    +        private BinaryObjectInspector mergeInputOI;
    +
    +        @Override
    +        protected Options getOptions() {
    +            Options opts = new Options();
    +            opts.addOption("p", true,
    +                "The size of registers for the normal set. `p` MUST be in the range [4,sp]
and 15 by the default");
    +            opts.addOption("sp", true,
    +                "The size of registers for the sparse set. `sp` MUST be in the range
[4,32] and 25 by the default");
    +            return opts;
    +        }
    +
    +        @Override
    +        protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs)
    +                throws UDFArgumentException {
    +            CommandLine cl = null;
    +
    +            int p = 15, sp = 25;
    +            if (argOIs.length == 2) {
    +                if (!HiveUtils.isConstString(argOIs[1])) {
    +                    throw new UDFArgumentException(
    +                        "The second argument type expected to be const string: " + argOIs[1]);
    +                }
    +                cl = parseOptions(HiveUtils.getConstString(argOIs[1]));
    +
    +                p = Primitives.parseInt(cl.getOptionValue("p"), p);
    +                sp = Primitives.parseInt(cl.getOptionValue("sp"), sp);
    +                validateArguments(p, sp);
    +            }
    +
    +            this.params = new int[] {p, sp};
    +
    +            return cl;
    +        }
    +
    +        @Override
    +        public ObjectInspector init(@Nonnull Mode mode, @Nonnull ObjectInspector[] parameters)
    +                throws HiveException {
    +            assert (parameters.length == 1 || parameters.length == 2) : parameters.length;
    +            super.init(mode, parameters);
    +
    +            // initialize input
    +            if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
    +                processOptions(parameters);
    +                this.origInputOI = parameters[0];
    +            } else {// from partial aggregation               
    +                this.mergeInputOI = HiveUtils.asBinaryOI(parameters[0]);
    +            }
    +
    +            // initialize output
    +            final ObjectInspector outputOI;
    +            if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
               
    +                outputOI = PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector;
    +            } else {// terminate
    +                outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    +            }
    +            return outputOI;
    +        }
    +
    +        @Override
    +        public HLLBuffer getNewAggregationBuffer() throws HiveException {
    +            HLLBuffer buf = new HLLBuffer();
    +            if (params != null) {
    +                buf.reset(params[0], params[1]);
    +            }
    +            return buf;
    +        }
    +
    +        @SuppressWarnings("deprecation")
    +        @Override
    +        public void reset(@Nonnull AggregationBuffer agg) throws HiveException {
    +            HLLBuffer buf = (HLLBuffer) agg;
    +            if (params != null) {
    +                buf.reset(params[0], params[1]);
    +            } else {
    +                buf.hll = null;
    +            }
    +        }
    +
    +        @SuppressWarnings("deprecation")
    +        @Override
    +        public void iterate(@Nonnull AggregationBuffer agg, @Nonnull Object[] parameters)
    +                throws HiveException {
    +            if (parameters[0] == null) {
    +                return;
    +            }
    +
    +            HLLBuffer buf = (HLLBuffer) agg;
    +            Object value =
    +                    ObjectInspectorUtils.copyToStandardJavaObject(parameters[0], origInputOI);
    +            Preconditions.checkNotNull(buf.hll, HiveException.class);
    +            buf.hll.offer(value);
    +        }
    +
    +        @SuppressWarnings("deprecation")
    +        @Override
    +        @Nullable
    +        public byte[] terminatePartial(@Nonnull AggregationBuffer agg) throws HiveException
{
    +            HLLBuffer buf = (HLLBuffer) agg;
    +            if (buf.hll == null) {
    +                return null;
    +            }
    +            try {
    +                return buf.hll.getBytes();
    +            } catch (IOException e) {
    +                throw new HiveException(e);
    +            }
    +        }
    +
    +        @SuppressWarnings("deprecation")
    +        @Override
    +        public void merge(@Nonnull AggregationBuffer agg, @Nullable Object partial)
    +                throws HiveException {
    +            if (partial == null) {
    +                return;
    +            }
    +
    +            byte[] data = mergeInputOI.getPrimitiveJavaObject(partial);
    +            final HyperLogLogPlus otherHLL;
    +            try {
    +                otherHLL = HyperLogLogPlus.Builder.build(data);
    +            } catch (IOException e) {
    +                throw new HiveException("Failed to build other HLL");
    +            }
    +
    +            final HLLBuffer buf = (HLLBuffer) agg;
    +            if (buf.hll == null) {
    +                buf.hll = otherHLL;
    +            } else {
    +                try {
    +                    buf.hll.addAll(otherHLL);
    +                } catch (CardinalityMergeException e) {
    +                    throw new HiveException("Failed to merge HLL");
    +                }
    +            }
    +        }
    +
    +        @SuppressWarnings("deprecation")
    +        @Override
    +        public LongWritable terminate(@Nonnull AggregationBuffer agg) throws HiveException
{
    +            HLLBuffer buf = (HLLBuffer) agg;
    +
    +            long cardinarity = (buf.hll == null) ? 0L : buf.hll.cardinality();
    --- End diff --
    
    @myui just noticed: s/cardinarity/cardinality/ also in document
    
    fix sometime in the future


---

Mime
View raw message