Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 05C3C185C6 for ; Thu, 25 Jun 2015 08:17:12 +0000 (UTC) Received: (qmail 50681 invoked by uid 500); 25 Jun 2015 08:17:11 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 50647 invoked by uid 500); 25 Jun 2015 08:17:11 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 50638 invoked by uid 99); 25 Jun 2015 08:17:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jun 2015 08:17:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B2F37DFDD0; Thu, 25 Jun 2015 08:17:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jihoonson@apache.org To: commits@tajo.apache.org Message-Id: <6af983a10cfc4e7c8a3d3d90a9ed1780@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tajo git commit: TAJO-1661: Implement CORR function. (jihoon) Date: Thu, 25 Jun 2015 08:17:11 +0000 (UTC) Repository: tajo Updated Branches: refs/heads/master 2ec307d62 -> aa49dc4a8 TAJO-1661: Implement CORR function. (jihoon) Closes #616 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/aa49dc4a Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/aa49dc4a Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/aa49dc4a Branch: refs/heads/master Commit: aa49dc4a8af9f836e44896c516d8b0cdb738e5dd Parents: 2ec307d Author: Jihoon Son Authored: Thu Jun 25 17:16:41 2015 +0900 Committer: Jihoon Son Committed: Thu Jun 25 17:16:41 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/engine/function/builtin/Corr.java | 224 +++++++++++++++++++ tajo-core/src/main/proto/InternalTypes.proto | 9 + .../engine/function/TestBuiltinFunctions.java | 33 +++ 4 files changed, 268 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 425ac5d..9412179 100644 --- a/CHANGES +++ b/CHANGES @@ -4,6 +4,8 @@ Release 0.11.0 - unreleased NEW FEATURES + TAJO-1661: Implement CORR function. (jihoon) + TAJO-1537: Implement a virtual table for sessions. (Contributed by Yongjin Choi, Committed by hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java new file mode 100644 index 0000000..310169f --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.function.builtin; + +import org.apache.tajo.InternalTypes.CorrProto; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.engine.function.annotation.Description; +import org.apache.tajo.engine.function.annotation.ParamTypes; +import org.apache.tajo.plan.function.AggFunction; +import org.apache.tajo.plan.function.FunctionContext; +import org.apache.tajo.storage.Tuple; + +/** + * Compute the Pearson correlation coefficient corr(x, y), using the following + * stable one-pass method, based on: + * "Formulas for Robust, One-Pass Parallel Computation of Covariances and + * Arbitrary-Order Statistical Moments", Philippe Pebay, Sandia Labs + * and "The Art of Computer Programming, volume 2: Seminumerical Algorithms", + * Donald Knuth. + * + * Incremental: + * n : + * mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : + * my_n = my_(n-1) + [y_n - my_(n-1)]/n : + * c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n - my_n) : + * vx_n = vx_(n-1) + (x_n - mx_n)(x_n - mx_(n-1)): + * vy_n = vy_(n-1) + (y_n - my_n)(y_n - my_(n-1)): + * + * Merge: + * c_(A,B) = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B) + * vx_(A,B) = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B) + * vy_(A,B) = vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B) + * + */ +@Description( + functionName = "corr", + example = "> SELECT corr(expr, expr);", + description = "Returns the Pearson coefficient of correlation between a set of number pairs.\n" + + "The function takes as arguments any pair of numeric types and returns a double.\n" + + "Any pair with a NULL is ignored. If the function is applied to an empty set or\n" + + "a singleton set, NULL will be returned. Otherwise, it computes the following:\n" + + " COVAR_POP(x,y)/(STDDEV_POP(x)*STDDEV_POP(y))\n" + + "where neither x nor y is null,\n" + + "COVAR_POP is the population covariance,\n" + + "and STDDEV_POP is the population standard deviation.", + returnType = Type.FLOAT8, + paramTypes = { + @ParamTypes(paramTypes = {Type.INT8, Type.INT8}), + @ParamTypes(paramTypes = {Type.INT8, Type.INT4}), + @ParamTypes(paramTypes = {Type.INT4, Type.INT8}), + @ParamTypes(paramTypes = {Type.INT4, Type.INT4}), + @ParamTypes(paramTypes = {Type.INT8, Type.FLOAT8}), + @ParamTypes(paramTypes = {Type.INT8, Type.FLOAT4}), + @ParamTypes(paramTypes = {Type.INT4, Type.FLOAT8}), + @ParamTypes(paramTypes = {Type.INT4, Type.FLOAT4}), + @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT8}), + @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT4}), + @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT8}), + @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT4}), + @ParamTypes(paramTypes = {Type.FLOAT8, Type.FLOAT8}), + @ParamTypes(paramTypes = {Type.FLOAT4, Type.FLOAT8}), + @ParamTypes(paramTypes = {Type.FLOAT8, Type.FLOAT4}), + @ParamTypes(paramTypes = {Type.FLOAT4, Type.FLOAT4}), + } +) +public class Corr extends AggFunction { + + /** + * Evaluate the Pearson correlation coefficient using a stable one-pass + * algorithm, based on work by Philippe Pébay and Donald Knuth. + * + * Incremental: + * n : + * mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : + * my_n = my_(n-1) + [y_n - my_(n-1)]/n : + * c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n - my_n) : + * vx_n = vx_(n-1) + (x_n - mx_n)(x_n - mx_(n-1)): + * vy_n = vy_(n-1) + (y_n - my_n)(y_n - my_(n-1)): + * + * Merge: + * c_X = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/n_X + * vx_(A,B) = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B) + * vy_(A,B) = vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B) + * + */ + public Corr() { + super(new Column[] { + new Column("expr", Type.FLOAT8), + new Column("expr", Type.FLOAT8) + }); + } + + public Corr(Column[] definedArgs) { + super(definedArgs); + } + + @Override + public FunctionContext newContext() { + return new CorrContext(); + } + + @Override + public void eval(FunctionContext ctx, Tuple params) { + if (!params.isBlankOrNull(0) && !params.isBlankOrNull(1)) { + CorrContext corrContext = (CorrContext) ctx; + double vx = params.getFloat8(0); + double vy = params.getFloat8(1); + double deltaX = vx - corrContext.xavg; + double deltaY = vy - corrContext.yavg; + corrContext.count++; + corrContext.xavg += deltaX / corrContext.count; + corrContext.yavg += deltaY / corrContext.count; + if (corrContext.count > 1) { + corrContext.covar += deltaX * (vy - corrContext.yavg); + corrContext.xvar += deltaX * (vx - corrContext.xavg); + corrContext.yvar += deltaY * (vy - corrContext.yavg); + } + } + } + + @Override + public void merge(FunctionContext ctx, Tuple part) { + CorrContext corrContext = (CorrContext) ctx; + if (part.isBlankOrNull(0)) { + return; + } + ProtobufDatum datum = (ProtobufDatum) part.getProtobufDatum(0); + CorrProto proto = (CorrProto) datum.get(); + long nA = corrContext.count; + long nB = proto.getCount(); + + if (nA == 0) { + corrContext.count = proto.getCount(); + corrContext.xavg = proto.getXavg(); + corrContext.yavg = proto.getYavg(); + corrContext.xvar = proto.getXvar(); + corrContext.yvar = proto.getYvar(); + corrContext.covar = proto.getCovar(); + } else { + // Merge the two partials + double xavgA = corrContext.xavg; + double yavgA = corrContext.yavg; + double xavgB = proto.getXavg(); + double yavgB = proto.getYavg(); + double xvarB = proto.getXvar(); + double yvarB = proto.getYvar(); + double covarB = proto.getCovar(); + + corrContext.count += nB; + corrContext.xavg = (xavgA * nA + xavgB * nB) / corrContext.count; + corrContext.yavg = (yavgA * nA + yavgB * nB) / corrContext.count; + corrContext.xvar += xvarB + (xavgA - xavgB) * (xavgA - xavgB) * nA * nB / corrContext.count; + corrContext.yvar += yvarB + (yavgA - yavgB) * (yavgA - yavgB) * nA * nB / corrContext.count; + corrContext.covar += + covarB + (xavgA - xavgB) * (yavgA - yavgB) * ((double) (nA * nB) / corrContext.count); + } + } + + @Override + public Datum getPartialResult(FunctionContext ctx) { + CorrContext corrContext = (CorrContext) ctx; + if (corrContext.count == 0) { + return NullDatum.get(); + } + CorrProto.Builder builder = CorrProto.newBuilder(); + builder.setCount(corrContext.count); + builder.setXavg(corrContext.xavg); + builder.setYavg(corrContext.yavg); + builder.setXvar(corrContext.xvar); + builder.setYvar(corrContext.yvar); + builder.setCovar(corrContext.covar); + return new ProtobufDatum(builder.build()); + } + + @Override + public DataType getPartialResultType() { + return CatalogUtil.newDataType(Type.PROTOBUF, CorrProto.class.getName()); + } + + @Override + public Datum terminate(FunctionContext ctx) { + CorrContext corrContext = (CorrContext) ctx; + + if (corrContext.count < 2) { // SQL standard - return null for zero or one pair + return NullDatum.get(); + } else { + return DatumFactory.createFloat8(corrContext.covar + / java.lang.Math.sqrt(corrContext.xvar) + / java.lang.Math.sqrt(corrContext.yvar)); + } + } + + protected static class CorrContext implements FunctionContext { + long count = 0; // number n of elements + double xavg = 0; // average of x elements + double yavg = 0; // average of y elements + double xvar = 0; // n times the variance of x elements + double yvar = 0; // n times the variance of y elements + double covar = 0; // n times the covariance + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/main/proto/InternalTypes.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/InternalTypes.proto b/tajo-core/src/main/proto/InternalTypes.proto index 13dd107..d23e244 100644 --- a/tajo-core/src/main/proto/InternalTypes.proto +++ b/tajo-core/src/main/proto/InternalTypes.proto @@ -36,3 +36,12 @@ message VarianceProto { required double avg = 2; required int64 count = 3; } + +message CorrProto { + required int64 count = 1; + required double xavg = 2; + required double yavg = 3; + required double xvar = 4; + required double yvar = 5; + required double covar = 6; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java index 5dae452..72fdd6f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java @@ -788,4 +788,37 @@ public class TestBuiltinFunctions extends QueryTestCaseBase { executeString("DROP TABLE rank_table2 PURGE"); } } + + @Test + public void testCorr() throws Exception { + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", TajoDataTypes.Type.INT4); + schema.addColumn("value_int", TajoDataTypes.Type.INT4); + schema.addColumn("value_long", TajoDataTypes.Type.INT8); + schema.addColumn("value_float", TajoDataTypes.Type.FLOAT4); + schema.addColumn("value_double", TajoDataTypes.Type.FLOAT8); + String[] data = new String[]{ + "1|\\N|-111|1.2|-50.5", + "2|1|\\N|\\N|52.5", + "3|2|-333|2.8|\\N", + "4|3|-555|2.8|43.2", + "5|4|-111|1.1|10.2",}; + TajoTestingCluster.createTable("testbuiltin11", schema, tableOptions, data, 1); + + try { + ResultSet res = executeString("select corr(value_int, value_long) as corr1, corr(value_long, value_float) as corr2, corr(value_float, value_double) as corr3, corr(value_double, value_int) as corr4 from testbuiltin11"); + String ascExpected = "corr1,corr2,corr3,corr4\n" + + "-------------------------------\n" + + "0.5,-0.9037045658322675,0.7350290063698216,-0.8761489936497805\n"; + + assertEquals(ascExpected, resultSetToString(res)); + res.close(); + } finally { + executeString("DROP TABLE testbuiltin11 PURGE"); + } + } }