Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 07F8F183AB for ; Fri, 18 Mar 2016 13:48:02 +0000 (UTC) Received: (qmail 44208 invoked by uid 500); 18 Mar 2016 13:48:01 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 44109 invoked by uid 500); 18 Mar 2016 13:48:01 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 43082 invoked by uid 99); 18 Mar 2016 13:47:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Mar 2016 13:47:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B69D4DFFAB; Fri, 18 Mar 2016 13:47:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Date: Fri, 18 Mar 2016 13:48:10 -0000 Message-Id: In-Reply-To: <23694d950bbb4ace842cfa98be8f6e58@git.apache.org> References: <23694d950bbb4ace842cfa98be8f6e58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/50] [abbrv] flink git commit: [FLINK-3226] implement GroupReduce translation; enable tests for supported operations [FLINK-3226] implement GroupReduce translation; enable tests for supported operations Squashes the following commits: - Compute average as sum and count for byte, short and int type to avoid rounding errors - Move aggregation functions to org.apache.flink.table.runtime - Remove join-related changes - Change integer average aggregations to maintain sum and count - Long average uses a BigInteger sum Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7972426e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7972426e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7972426e Branch: refs/heads/master Commit: 7972426e40b50bd963f6f895b9755c7408baff5a Parents: b8028db Author: vasia Authored: Thu Feb 4 15:53:52 2016 +0100 Committer: vasia Committed: Fri Mar 18 14:44:49 2016 +0100 ---------------------------------------------------------------------- .../flink/api/table/plan/TypeConverter.scala | 2 +- .../plan/functions/AggregateFunction.scala | 71 --------- .../table/plan/functions/FunctionUtils.scala | 37 ----- .../plan/functions/aggregate/Aggregate.scala | 42 ------ .../functions/aggregate/AggregateFactory.scala | 135 ----------------- .../plan/functions/aggregate/AvgAggregate.scala | 148 ------------------- .../functions/aggregate/CountAggregate.scala | 34 ----- .../plan/functions/aggregate/MaxAggregate.scala | 136 ----------------- .../plan/functions/aggregate/MinAggregate.scala | 136 ----------------- .../plan/functions/aggregate/SumAggregate.scala | 130 ---------------- .../plan/nodes/dataset/DataSetGroupReduce.scala | 30 +++- .../table/plan/nodes/dataset/DataSetJoin.scala | 6 +- .../plan/nodes/logical/FlinkAggregate.scala | 16 -- .../api/table/plan/rules/FlinkRuleSets.scala | 3 +- .../rules/dataset/DataSetAggregateRule.scala | 13 +- .../plan/rules/dataset/DataSetJoinRule.scala | 102 +------------ .../api/table/runtime/AggregateFunction.scala | 76 ++++++++++ .../api/table/runtime/aggregate/Aggregate.scala | 42 ++++++ .../runtime/aggregate/AggregateFactory.scala | 136 +++++++++++++++++ .../table/runtime/aggregate/AvgAggregate.scala | 131 ++++++++++++++++ .../runtime/aggregate/CountAggregate.scala | 34 +++++ .../table/runtime/aggregate/MaxAggregate.scala | 84 +++++++++++ .../table/runtime/aggregate/MinAggregate.scala | 86 +++++++++++ .../table/runtime/aggregate/SumAggregate.scala | 50 +++++++ .../api/java/table/test/AggregationsITCase.java | 13 +- .../api/java/table/test/ExpressionsITCase.java | 2 - .../flink/api/java/table/test/FilterITCase.java | 2 - .../table/test/GroupedAggregationsITCase.java | 6 +- .../flink/api/java/table/test/SelectITCase.java | 2 - .../flink/api/java/table/test/UnionITCase.java | 1 - .../scala/table/test/AggregationsITCase.scala | 11 +- .../scala/table/test/ExpressionsITCase.scala | 1 - .../table/test/GroupedAggregationsITCase.scala | 6 +- 33 files changed, 704 insertions(+), 1020 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala index b7cb200..1fc482a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala @@ -135,7 +135,7 @@ object TypeConverter { logicalFieldTypes.head } else { - new TupleTypeInfo[Any](logicalFieldTypes.toArray:_*) + new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala deleted file mode 100644 index 4abf2d2..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.functions - -import java.lang.Iterable - -import com.google.common.base.Preconditions -import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.api.table.plan.functions.aggregate.Aggregate -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector - -import scala.collection.JavaConversions._ - -/** - * A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped data as input, - * feed to the aggregates, and collect the record with aggregated value. - * - * @param aggregates Sql aggregate functions. - * @param fields The grouped keys' index. - */ -class AggregateFunction( - private val aggregates: Array[Aggregate[_ <: Any]], - private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] { - - override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(fields) - Preconditions.checkArgument(aggregates.size == fields.size) - - aggregates.foreach(_.initiateAggregate) - } - - override def reduce(records: Iterable[Any], out: Collector[Any]): Unit = { - var currentValue: Any = null - - // iterate all input records, feed to each aggregate. - val aggregateAndField = aggregates.zip(fields) - records.foreach { - value => - currentValue = value - aggregateAndField.foreach { - case (aggregate, field) => - aggregate.aggregate(FunctionUtils.getFieldValue(value, field)) - } - } - - // reuse the latest record, and set all the aggregated values. - aggregateAndField.foreach { - case (aggregate, field) => - FunctionUtils.putFieldValue(currentValue, field, aggregate.getAggregated()) - } - - out.collect(currentValue) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala deleted file mode 100644 index 9d62b7c..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.functions - -import org.apache.flink.api.table.Row - -object FunctionUtils { - - def getFieldValue(record: Any, fieldIndex: Int): Any = { - record match { - case row: Row => row.productElement(fieldIndex) - case _ => throw new UnsupportedOperationException("Do not support types other than Row now.") - } - } - - def putFieldValue(record: Any, fieldIndex: Int, fieldValue: Any): Unit = { - record match { - case row: Row => row.setField(fieldIndex, fieldValue) - case _ => throw new UnsupportedOperationException("Do not support types other than Row now.") - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala deleted file mode 100644 index 5800d5f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.functions.aggregate - -/** - * Represent a Sql aggregate function, user should initiate the aggregate at first, then feed it - * with grouped aggregate field values, and get aggregated value finally. - * @tparam T - */ -trait Aggregate[T] { - /** - * Initiate current aggregate state. - */ - def initiateAggregate - - /** - * Feed the aggregate field value. - * @param value - */ - def aggregate(value: Any) - - /** - * Return final aggregated value. - * @return - */ - def getAggregated(): T -} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala deleted file mode 100644 index a95a163..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.functions.aggregate - -import java.util - -import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.sql.SqlAggFunction -import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.fun._ -import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.api.table.plan.PlanGenException -import org.apache.flink.api.table.plan.functions.AggregateFunction - -object AggregateFactory { - - def createAggregateInstance(aggregateCalls: Seq[AggregateCall]): - RichGroupReduceFunction[Any, Any] = { - - val fieldIndexes = new Array[Int](aggregateCalls.size) - val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size) - aggregateCalls.zipWithIndex.map { case (aggregateCall, index) => - val sqlType = aggregateCall.getType - val argList: util.List[Integer] = aggregateCall.getArgList - // currently assume only aggregate on singleton field. - if (argList.isEmpty) { - if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { - fieldIndexes(index) = 0 - } else { - throw new PlanGenException("Aggregate fields should not be empty.") - } - } else { - fieldIndexes(index) = argList.get(0); - } - aggregateCall.getAggregation match { - case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => { - sqlType.getSqlTypeName match { - case TINYINT => - aggregates(index) = new TinyIntSumAggregate - case SMALLINT => - aggregates(index) = new SmallIntSumAggregate - case INTEGER => - aggregates(index) = new IntSumAggregate - case BIGINT => - aggregates(index) = new LongSumAggregate - case FLOAT => - aggregates(index) = new FloatSumAggregate - case DOUBLE => - aggregates(index) = new DoubleSumAggregate - case sqlType: SqlTypeName => - throw new PlanGenException("Sum aggregate does no support type:" + sqlType) - } - } - case _: SqlAvgAggFunction => { - sqlType.getSqlTypeName match { - case TINYINT => - aggregates(index) = new TinyIntAvgAggregate - case SMALLINT => - aggregates(index) = new SmallIntAvgAggregate - case INTEGER => - aggregates(index) = new IntAvgAggregate - case BIGINT => - aggregates(index) = new LongAvgAggregate - case FLOAT => - aggregates(index) = new FloatAvgAggregate - case DOUBLE => - aggregates(index) = new DoubleAvgAggregate - case sqlType: SqlTypeName => - throw new PlanGenException("Avg aggregate does no support type:" + sqlType) - } - } - case sqlMinMaxFunction: SqlMinMaxAggFunction => { - if (sqlMinMaxFunction.isMin) { - sqlType.getSqlTypeName match { - case TINYINT => - aggregates(index) = new TinyIntMinAggregate - case SMALLINT => - aggregates(index) = new SmallIntMinAggregate - case INTEGER => - aggregates(index) = new IntMinAggregate - case BIGINT => - aggregates(index) = new LongMinAggregate - case FLOAT => - aggregates(index) = new FloatMinAggregate - case DOUBLE => - aggregates(index) = new DoubleMinAggregate - case sqlType: SqlTypeName => - throw new PlanGenException("Min aggregate does no support type:" + sqlType) - } - } else { - sqlType.getSqlTypeName match { - case TINYINT => - aggregates(index) = new TinyIntMaxAggregate - case SMALLINT => - aggregates(index) = new SmallIntMaxAggregate - case INTEGER => - aggregates(index) = new IntMaxAggregate - case BIGINT => - aggregates(index) = new LongMaxAggregate - case FLOAT => - aggregates(index) = new FloatMaxAggregate - case DOUBLE => - aggregates(index) = new DoubleMaxAggregate - case sqlType: SqlTypeName => - throw new PlanGenException("Max aggregate does no support type:" + sqlType) - } - } - } - case _: SqlCountAggFunction => - aggregates(index) = new CountAggregate - case unSupported: SqlAggFunction => - throw new PlanGenException("unsupported Function: " + unSupported.getName) - } - } - - new AggregateFunction(aggregates, fieldIndexes) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala deleted file mode 100644 index e9c5f8f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.functions.aggregate - -abstract class AvgAggregate[T] extends Aggregate[T] { - -} - -// TinyInt average aggregate return Int as aggregated value. -class TinyIntAvgAggregate extends AvgAggregate[Int] { - private var avgValue: Double = 0 - private var count: Int = 0 - - override def initiateAggregate: Unit = { - avgValue = 0 - count = 0 - } - - override def aggregate(value: Any): Unit = { - count += 1 - val current = value.asInstanceOf[Byte] - avgValue += (current - avgValue) / count - } - - override def getAggregated(): Int = { - avgValue.toInt - } -} - -// SmallInt average aggregate return Int as aggregated value. -class SmallIntAvgAggregate extends AvgAggregate[Int] { - private var avgValue: Double = 0 - private var count: Int = 0 - - override def initiateAggregate: Unit = { - avgValue = 0 - count = 0 - } - - override def aggregate(value: Any): Unit = { - count += 1 - val current = value.asInstanceOf[Short] - avgValue += (current - avgValue) / count - } - - override def getAggregated(): Int = { - avgValue.toInt - } -} - -// Int average aggregate return Int as aggregated value. -class IntAvgAggregate extends AvgAggregate[Int] { - private var avgValue: Double = 0 - private var count: Int = 0 - - override def initiateAggregate: Unit = { - avgValue = 0 - count = 0 - } - - override def aggregate(value: Any): Unit = { - count += 1 - val current = value.asInstanceOf[Int] - avgValue += (current - avgValue) / count - } - - override def getAggregated(): Int = { - avgValue.toInt - } -} - -// Long average aggregate return Long as aggregated value. -class LongAvgAggregate extends AvgAggregate[Long] { - private var avgValue: Double = 0 - private var count: Int = 0 - - override def initiateAggregate: Unit = { - avgValue = 0 - count = 0 - } - - override def aggregate(value: Any): Unit = { - count += 1 - val current = value.asInstanceOf[Long] - avgValue += (current - avgValue) / count - } - - override def getAggregated(): Long = { - avgValue.toLong - } -} - -// Float average aggregate return Float as aggregated value. -class FloatAvgAggregate extends AvgAggregate[Float] { - private var avgValue: Double = 0 - private var count: Int = 0 - - override def initiateAggregate: Unit = { - avgValue = 0 - count = 0 - } - - override def aggregate(value: Any): Unit = { - count += 1 - val current = value.asInstanceOf[Float] - avgValue += (current - avgValue) / count - } - - override def getAggregated(): Float = { - avgValue.toFloat - } -} - -// Double average aggregate return Double as aggregated value. -class DoubleAvgAggregate extends AvgAggregate[Double] { - private var avgValue: Double = 0 - private var count: Int = 0 - - override def initiateAggregate: Unit = { - avgValue = 0 - count = 0 - } - - override def aggregate(value: Any): Unit = { - count += 1 - val current = value.asInstanceOf[Double] - avgValue += (current - avgValue) / count - } - - override def getAggregated(): Double = { - avgValue - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala deleted file mode 100644 index ab6b170..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.functions.aggregate - -class CountAggregate extends Aggregate[Long] { - private var count: Long = 0L - - override def initiateAggregate: Unit = { - count = 0 - } - - override def aggregate(value: Any): Unit = { - count += 1 - } - - override def getAggregated(): Long = { - count - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala deleted file mode 100644 index 072eb3f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.functions.aggregate - -abstract class MaxAggregate[T] extends Aggregate[T]{ - -} - -class TinyIntMaxAggregate extends MaxAggregate[Byte] { - private var max = Byte.MaxValue - - override def initiateAggregate: Unit = { - max = Byte.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Byte] - if (current < max) { - max = current - } - } - - override def getAggregated(): Byte = { - max - } -} - -class SmallIntMaxAggregate extends MaxAggregate[Short] { - private var max = Short.MaxValue - - override def initiateAggregate: Unit = { - max = Short.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Short] - if (current < max) { - max = current - } - } - - override def getAggregated(): Short = { - max - } -} - -class IntMaxAggregate extends MaxAggregate[Int] { - private var max = Int.MaxValue - - override def initiateAggregate: Unit = { - max = Int.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Int] - if (current < max) { - max = current - } - } - - override def getAggregated(): Int = { - max - } -} - -class LongMaxAggregate extends MaxAggregate[Long] { - private var max = Long.MaxValue - - override def initiateAggregate: Unit = { - max = Int.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Long] - if (current < max) { - max = current - } - } - - override def getAggregated(): Long = { - max - } -} - -class FloatMaxAggregate extends MaxAggregate[Float] { - private var max = Float.MaxValue - - override def initiateAggregate: Unit = { - max = Int.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Float] - if (current < max) { - max = current - } - } - - override def getAggregated(): Float = { - max - } -} - -class DoubleMaxAggregate extends MaxAggregate[Double] { - private var max = Double.MaxValue - - override def initiateAggregate: Unit = { - max = Int.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Double] - if (current < max) { - max = current - } - } - - override def getAggregated(): Double = { - max - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala deleted file mode 100644 index c233b8e..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.functions.aggregate - -abstract class MinAggregate[T] extends Aggregate[T]{ - -} - -class TinyIntMinAggregate extends MinAggregate[Byte] { - private var min = Byte.MaxValue - - override def initiateAggregate: Unit = { - min = Byte.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Byte] - if (current < min) { - min = current - } - } - - override def getAggregated(): Byte = { - min - } -} - -class SmallIntMinAggregate extends MinAggregate[Short] { - private var min = Short.MaxValue - - override def initiateAggregate: Unit = { - min = Short.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Short] - if (current < min) { - min = current - } - } - - override def getAggregated(): Short = { - min - } -} - -class IntMinAggregate extends MinAggregate[Int] { - private var min = Int.MaxValue - - override def initiateAggregate: Unit = { - min = Int.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Int] - if (current < min) { - min = current - } - } - - override def getAggregated(): Int = { - min - } -} - -class LongMinAggregate extends MinAggregate[Long] { - private var min = Long.MaxValue - - override def initiateAggregate: Unit = { - min = Int.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Long] - if (current < min) { - min = current - } - } - - override def getAggregated(): Long = { - min - } -} - -class FloatMinAggregate extends MinAggregate[Float] { - private var min = Float.MaxValue - - override def initiateAggregate: Unit = { - min = Int.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Float] - if (current < min) { - min = current - } - } - - override def getAggregated(): Float = { - min - } -} - -class DoubleMinAggregate extends MinAggregate[Double] { - private var min = Double.MaxValue - - override def initiateAggregate: Unit = { - min = Int.MaxValue - } - - override def aggregate(value: Any): Unit = { - val current = value.asInstanceOf[Double] - if (current < min) { - min = current - } - } - - override def getAggregated(): Double = { - min - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala deleted file mode 100644 index 14d1a73..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan.functions.aggregate - -abstract class SumAggregate[T] extends Aggregate[T]{ - -} - -// TinyInt sum aggregate return Int as aggregated value. -class TinyIntSumAggregate extends SumAggregate[Int] { - - private var sumValue: Int = 0 - - override def initiateAggregate: Unit = { - sumValue = 0 - } - - - override def getAggregated(): Int = { - sumValue - } - - override def aggregate(value: Any): Unit = { - sumValue += value.asInstanceOf[Byte] - } -} - -// SmallInt sum aggregate return Int as aggregated value. -class SmallIntSumAggregate extends SumAggregate[Int] { - - private var sumValue: Int = 0 - - override def initiateAggregate: Unit = { - sumValue = 0 - } - - override def getAggregated(): Int = { - sumValue - } - - override def aggregate(value: Any): Unit = { - sumValue += value.asInstanceOf[Short] - } -} - -// Int sum aggregate return Int as aggregated value. -class IntSumAggregate extends SumAggregate[Int] { - - private var sumValue: Int = 0 - - override def initiateAggregate: Unit = { - sumValue = 0 - } - - - override def getAggregated(): Int = { - sumValue - } - - override def aggregate(value: Any): Unit = { - sumValue += value.asInstanceOf[Int] - } -} - -// Long sum aggregate return Long as aggregated value. -class LongSumAggregate extends SumAggregate[Long] { - - private var sumValue: Long = 0L - - override def initiateAggregate: Unit = { - sumValue = 0 - } - - override def aggregate(value: Any): Unit = { - sumValue += value.asInstanceOf[Long] - } - - override def getAggregated(): Long = { - sumValue - } -} - -// Float sum aggregate return Float as aggregated value. -class FloatSumAggregate extends SumAggregate[Float] { - private var sumValue: Float = 0 - - override def initiateAggregate: Unit = { - sumValue = 0 - } - - override def aggregate(value: Any): Unit = { - sumValue += value.asInstanceOf[Float] - } - - override def getAggregated(): Float = { - sumValue - } -} - -// Double sum aggregate return Double as aggregated value. -class DoubleSumAggregate extends SumAggregate[Double] { - private var sumValue: Double = 0 - - override def initiateAggregate: Unit = { - sumValue = 0 - } - - override def aggregate(value: Any): Unit = { - sumValue += value.asInstanceOf[Double] - } - - override def getAggregated(): Double = { - sumValue - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala index 70810c8..ad7e0e9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala @@ -25,6 +25,11 @@ import org.apache.flink.api.common.functions.GroupReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.{TableConfig, Row} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.apache.flink.api.common.typeinfo.TypeInformation +import scala.collection.JavaConverters._ +import org.apache.flink.api.table.plan.TypeConverter /** * Flink RelNode which matches along with ReduceGroupOperator. @@ -36,7 +41,7 @@ class DataSetGroupReduce( rowType: RelDataType, opName: String, groupingKeys: Array[Int], - func: GroupReduceFunction[Any, Any]) + func: GroupReduceFunction[Row, Row]) extends SingleRel(cluster, traitSet, input) with DataSetRel { @@ -61,6 +66,27 @@ class DataSetGroupReduce( override def translateToPlan( config: TableConfig, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - ??? + + val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config) + + // get the output types + val fieldsNames = rowType.getFieldNames + val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala + .map(f => f.getType.getSqlTypeName) + .map(n => TypeConverter.sqlTypeToTypeInfo(n)) + .toArray + + val rowTypeInfo = new RowTypeInfo(fieldTypes) + + if (groupingKeys.length > 0) { + inputDS.asInstanceOf[DataSet[Row]].groupBy(groupingKeys: _*).reduceGroup(func) + .returns(rowTypeInfo) + .asInstanceOf[DataSet[Any]] + } + else { + // global aggregation + inputDS.asInstanceOf[DataSet[Row]].reduceGroup(func) + .returns(rowTypeInfo).asInstanceOf[DataSet[Any]] + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index 6f988be..de436be 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -18,9 +18,9 @@ package org.apache.flink.api.table.plan.nodes.dataset -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} import org.apache.flink.api.common.functions.JoinFunction import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.typeinfo.TypeInformation @@ -42,7 +42,7 @@ class DataSetJoin( joinKeysRight: Array[Int], joinType: JoinType, joinHint: JoinHint, - func: JoinFunction[Any, Any, Any]) + func: JoinFunction[Row, Row, Row]) extends BiRel(cluster, traitSet, left, right) with DataSetRel { http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala index f66cb71..1fca03a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala @@ -57,20 +57,4 @@ class FlinkAggregate( aggCalls ) } - - override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { - - val origCosts = super.computeSelfCost(planner) - val deltaCost = planner.getCostFactory.makeHugeCost() - - // only prefer aggregations with transformed Avg - aggCalls.toList.foldLeft[RelOptCost](origCosts){ - (c: RelOptCost, a: AggregateCall) => - if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) { - c.plus(deltaCost) - } else { - c - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 97e8b32..ac52b48 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -60,7 +60,8 @@ object FlinkRuleSets { AggregateRemoveRule.INSTANCE, AggregateJoinTransposeRule.EXTENDED, AggregateUnionAggregateRule.INSTANCE, - AggregateReduceFunctionsRule.INSTANCE, + // deactivate this rule temporarily + // AggregateReduceFunctionsRule.INSTANCE, AggregateExpandDistinctAggregatesRule.INSTANCE, // remove unnecessary sort rule http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala index 9ecd9d0..c6afb8a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala @@ -21,11 +21,10 @@ package org.apache.flink.api.table.plan.rules.dataset import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.table.plan.functions.aggregate.AggregateFactory import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetGroupReduce} import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, FlinkConvention} - import scala.collection.JavaConversions._ +import org.apache.flink.api.table.runtime.aggregate.AggregateFactory class DataSetAggregateRule extends ConverterRule( @@ -40,11 +39,13 @@ class DataSetAggregateRule val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) - val grouping = agg.getGroupSet.asList().map { - case a: java.lang.Integer => a.intValue - }.toArray + val grouping = agg.getGroupSet.toArray + + val inputType = agg.getInput.getRowType() - val aggregateFunction = AggregateFactory.createAggregateInstance(agg.getAggCallList) + // add grouping fields, position keys in the input, and input type + val aggregateFunction = AggregateFactory.createAggregateInstance(agg.getAggCallList, + inputType, grouping) new DataSetGroupReduce( rel.getCluster, http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala index 69c86c8..3d2117d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala @@ -20,17 +20,10 @@ package org.apache.flink.api.table.plan.rules.dataset import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataType} import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rex.{RexCall, RexInputRef} -import org.apache.calcite.sql.SqlKind -import org.apache.flink.api.table.plan.PlanGenException +import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetJoin} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, FlinkJoin} -import org.apache.flink.api.table.plan.TypeConverter._ - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer +import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention} class DataSetJoinRule extends ConverterRule( @@ -46,10 +39,6 @@ class DataSetJoinRule val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) - val joinKeys = getJoinKeys(join) - - // There would be a FlinkProject after FlinkJoin to handle the output fields afterward join, - // so we do not need JoinFunction here by now. new DataSetJoin( rel.getCluster, traitSet, @@ -57,93 +46,12 @@ class DataSetJoinRule convRight, rel.getRowType, join.toString, - joinKeys._1, - joinKeys._2, - sqlJoinTypeToFlinkJoinType(join.getJoinType), + Array[Int](), + Array[Int](), + JoinType.INNER, null, null) } - - private def getJoinKeys(join: FlinkJoin): (Array[Int], Array[Int]) = { - val joinKeys = ArrayBuffer.empty[(Int, Int)] - parseJoinRexNode(join.getCondition.asInstanceOf[RexCall], joinKeys) - - val joinedRowType= join.getRowType - val leftRowType = join.getLeft.getRowType - val rightRowType = join.getRight.getRowType - - // The fetched join key index from Calcite is based on joined row type, we need - // the join key index based on left/right input row type. - val joinKeyPairs: ArrayBuffer[(Int, Int)] = joinKeys.map { - case (first, second) => - var leftIndex = findIndexInSingleInput(first, joinedRowType, leftRowType) - if (leftIndex == -1) { - leftIndex = findIndexInSingleInput(second, joinedRowType, leftRowType) - if (leftIndex == -1) { - throw new PlanGenException("Invalid join condition, could not find " + - joinedRowType.getFieldNames.get(first) + " and " + - joinedRowType.getFieldNames.get(second) + " in left table") - } - val rightIndex = findIndexInSingleInput(first, joinedRowType, rightRowType) - if (rightIndex == -1) { - throw new PlanGenException("Invalid join condition could not find " + - joinedRowType.getFieldNames.get(first) + " in right table") - } - (leftIndex, rightIndex) - } else { - val rightIndex = findIndexInSingleInput(second, joinedRowType, rightRowType) - if (rightIndex == -1) { - throw new PlanGenException("Invalid join condition could not find " + - joinedRowType.getFieldNames.get(second) + " in right table") - } - (leftIndex, rightIndex) - } - } - - val joinKeysPair = joinKeyPairs.unzip - - (joinKeysPair._1.toArray, joinKeysPair._2.toArray) - } - - // Parse the join condition recursively, find all the join keys' index. - private def parseJoinRexNode(condition: RexCall, joinKeys: ArrayBuffer[(Int, Int)]): Unit = { - condition.getOperator.getKind match { - case SqlKind.AND => - condition.getOperands.foreach { - operand => parseJoinRexNode(operand.asInstanceOf[RexCall], joinKeys) - } - case SqlKind.EQUALS => - val operands = condition.getOperands - val leftIndex = operands(0).asInstanceOf[RexInputRef].getIndex - val rightIndex = operands(1).asInstanceOf[RexInputRef].getIndex - joinKeys += (leftIndex -> rightIndex) - case _ => - // Do not support operands like OR in join condition due to the limitation - // of current Flink JoinOperator implementation. - throw new PlanGenException("Do not support operands other than " + - "AND and Equals in join condition now.") - } - } - - // Find the field index of input row type. - private def findIndexInSingleInput( - globalIndex: Int, - joinedRowType: RelDataType, - inputRowType: RelDataType): Int = { - - val fieldType: RelDataTypeField = joinedRowType.getFieldList.get(globalIndex) - inputRowType.getFieldList.zipWithIndex.foreach { - case (inputFieldType, index) => - if (inputFieldType.getName.equals(fieldType.getName) - && inputFieldType.getType.equals(fieldType.getType)) { - - return index - } - } - - // return -1 if match none field of input row type. - -1 - } } object DataSetJoinRule { http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/AggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/AggregateFunction.scala new file mode 100644 index 0000000..47f903f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/AggregateFunction.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import java.lang.Iterable +import com.google.common.base.Preconditions +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import scala.collection.JavaConversions._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.runtime.aggregate.Aggregate + +/** + * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the grouped data as input, + * feed to the aggregates, and collect the record with aggregated value. + * + * @param aggregates SQL aggregate functions. + * @param fields The grouped keys' indices in the input. + * @param groupingKeys The grouping keys' positions. + */ +class AggregateFunction( + private val aggregates: Array[Aggregate[_ <: Any]], + private val fields: Array[Int], + private val groupingKeys: Array[Int]) extends RichGroupReduceFunction[Row, Row] { + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(fields) + Preconditions.checkNotNull(groupingKeys) + Preconditions.checkArgument(aggregates.size == fields.size) + } + + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + aggregates.foreach(_.initiateAggregate) + + var currentRecord: Row = null + + // iterate all input records, feed to each aggregate. + val recordIterator = records.iterator + while (recordIterator.hasNext) { + currentRecord = recordIterator.next() + for (i <- 0 until aggregates.length) { + aggregates(i).aggregate(currentRecord.productElement(fields(i))) + } + } + + // output a new Row type that contains the grouping keys and aggregates + var outValue: Row = new Row(groupingKeys.length + aggregates.length) + + // copy the grouping fields from the last input row to the output row + for (i <- 0 until groupingKeys.length) { + outValue.setField(i, currentRecord.productElement(groupingKeys(i))) + } + // copy the results of the aggregate functions to the output row + for (i <- groupingKeys.length until groupingKeys.length + aggregates.length) { + outValue.setField(i, aggregates(i - groupingKeys.length).getAggregated) + } + out.collect(outValue) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala new file mode 100644 index 0000000..5bc744a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +/** + * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it + * with grouped aggregate field values, and finally get the aggregated value. + * @tparam T the output type + */ +trait Aggregate[T] extends Serializable { + /** + * Initialize the aggregate state. + */ + def initiateAggregate + + /** + * Feed the aggregate field value. + * @param value + */ + def aggregate(value: Any) + + /** + * Return final aggregated value. + * @return + */ + def getAggregated(): T +} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateFactory.scala new file mode 100644 index 0000000..bb045fe --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateFactory.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import java.util +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.fun._ +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.table.plan.PlanGenException +import org.apache.flink.api.table.runtime.AggregateFunction +import org.apache.flink.api.table.Row +import org.apache.calcite.rel.`type`.RelDataType + +object AggregateFactory { + + def createAggregateInstance(aggregateCalls: Seq[AggregateCall], + inputType: RelDataType, groupings: Array[Int]): RichGroupReduceFunction[Row, Row] = { + + val fieldIndexes = new Array[Int](aggregateCalls.size) + val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size) + aggregateCalls.zipWithIndex.map { case (aggregateCall, index) => + val argList: util.List[Integer] = aggregateCall.getArgList + // currently assume only aggregate on singleton field. + if (argList.isEmpty) { + if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { + fieldIndexes(index) = 0 + } else { + throw new PlanGenException("Aggregate fields should not be empty.") + } + } else { + fieldIndexes(index) = argList.get(0) + } + val sqlTypeName = inputType.getFieldList.get(fieldIndexes(index)).getType.getSqlTypeName + aggregateCall.getAggregation match { + case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => { + sqlTypeName match { + case TINYINT => + aggregates(index) = new SumAggregate[Byte] + case SMALLINT => + aggregates(index) = new SumAggregate[Short] + case INTEGER => + aggregates(index) = new SumAggregate[Int] + case BIGINT => + aggregates(index) = new SumAggregate[Long] + case FLOAT => + aggregates(index) = new SumAggregate[Float] + case DOUBLE => + aggregates(index) = new SumAggregate[Double] + case sqlType: SqlTypeName => + throw new PlanGenException("Sum aggregate does no support type:" + sqlType) + } + } + case _: SqlAvgAggFunction => { + sqlTypeName match { + case TINYINT => + aggregates(index) = new ByteAvgAggregate + case SMALLINT => + aggregates(index) = new ShortAvgAggregate + case INTEGER => + aggregates(index) = new IntAvgAggregate + case BIGINT => + aggregates(index) = new LongAvgAggregate + case FLOAT => + aggregates(index) = new FloatAvgAggregate + case DOUBLE => + aggregates(index) = new DoubleAvgAggregate + case sqlType: SqlTypeName => + throw new PlanGenException("Avg aggregate does no support type:" + sqlType) + } + } + case sqlMinMaxFunction: SqlMinMaxAggFunction => { + if (sqlMinMaxFunction.isMin) { + sqlTypeName match { + case TINYINT => + aggregates(index) = new TinyMinAggregate + case SMALLINT => + aggregates(index) = new SmallMinAggregate + case INTEGER => + aggregates(index) = new IntMinAggregate + case BIGINT => + aggregates(index) = new LongMinAggregate + case FLOAT => + aggregates(index) = new FloatMinAggregate + case DOUBLE => + aggregates(index) = new DoubleMinAggregate + case sqlType: SqlTypeName => + throw new PlanGenException("Min aggregate does no support type:" + sqlType) + } + } else { + sqlTypeName match { + case TINYINT => + aggregates(index) = new TinyIntMaxAggregate + case SMALLINT => + aggregates(index) = new SmallIntMaxAggregate + case INTEGER => + aggregates(index) = new IntMaxAggregate + case BIGINT => + aggregates(index) = new LongMaxAggregate + case FLOAT => + aggregates(index) = new FloatMaxAggregate + case DOUBLE => + aggregates(index) = new DoubleMaxAggregate + case sqlType: SqlTypeName => + throw new PlanGenException("Max aggregate does no support type:" + sqlType) + } + } + } + case _: SqlCountAggFunction => + aggregates(index) = new CountAggregate + case unSupported: SqlAggFunction => + throw new PlanGenException("unsupported Function: " + unSupported.getName) + } + } + + new AggregateFunction(aggregates, fieldIndexes, groupings) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala new file mode 100644 index 0000000..6a5a5a3 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import java.math.BigInteger +import com.google.common.math.LongMath + +// for byte, short, int we return int +abstract class IntegralAvgAggregate[T: Numeric] extends Aggregate[T] { + + var sum: Long = 0 + var count: Long = 0 + + override def initiateAggregate: Unit = { + sum = 0 + count = 0 + } + +} + +class ByteAvgAggregate extends IntegralAvgAggregate[Byte] { + + override def aggregate(value: Any): Unit = { + count += 1 + sum = LongMath.checkedAdd(sum, value.asInstanceOf[Byte]) + } + + override def getAggregated(): Byte = { + (sum / count).toByte + } +} + +class ShortAvgAggregate extends IntegralAvgAggregate[Short] { + + override def aggregate(value: Any): Unit = { + count += 1 + sum = LongMath.checkedAdd(sum, value.asInstanceOf[Short]) + } + + override def getAggregated(): Short = { + (sum / count).toShort + } +} + +class IntAvgAggregate extends IntegralAvgAggregate[Int] { + + override def aggregate(value: Any): Unit = { + count += 1 + sum = LongMath.checkedAdd(sum, value.asInstanceOf[Int]) + } + + override def getAggregated(): Int = { + (sum / count).toInt + } +} + +// Long average aggregate return Long as aggregated value. +class LongAvgAggregate extends Aggregate[Long] { + + var sum: BigInteger = BigInteger.ZERO + var count: Long = 0 + + override def initiateAggregate: Unit = { + sum = BigInteger.ZERO + count = 0 + } + + override def aggregate(value: Any): Unit = { + count += 1 + sum = sum.add(BigInteger.valueOf(value.asInstanceOf[Long])) + } + + override def getAggregated(): Long = { + sum.divide(BigInteger.valueOf(count)).longValue + } +} + +// Float average aggregate return Float as aggregated value. +abstract class FloatingPointAvgAggregate[T: Numeric] extends Aggregate[T] { + + var avgValue: Double = 0 + var count: Long = 0 + + override def initiateAggregate: Unit = { + avgValue = 0 + count = 0 + } +} + +// Double average aggregate return Double as aggregated value. +class FloatAvgAggregate extends FloatingPointAvgAggregate[Float] { + + override def aggregate(value: Any): Unit = { + count += 1 + val current = value.asInstanceOf[Float] + avgValue += (current - avgValue) / count + } + + override def getAggregated(): Float = { + avgValue.toFloat + } +} + +// Double average aggregate return Double as aggregated value. +class DoubleAvgAggregate extends FloatingPointAvgAggregate[Double] { + + override def aggregate(value: Any): Unit = { + count += 1 + val current = value.asInstanceOf[Double] + avgValue += (current - avgValue) / count + } + + override def getAggregated(): Double = { + avgValue + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala new file mode 100644 index 0000000..b2dd434 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +class CountAggregate extends Aggregate[Long] { + private var count: Long = 0L + + override def initiateAggregate: Unit = { + count = 0 + } + + override def aggregate(value: Any): Unit = { + count += 1 + } + + override def getAggregated(): Long = { + count + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala new file mode 100644 index 0000000..3cf0ba9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +abstract class MaxAggregate[T: Numeric] extends Aggregate[T] { + + var result: T = _ + val numericResult = implicitly[Numeric[T]] + + override def aggregate(value: Any): Unit = { + val input: T = value.asInstanceOf[T] + + result = numericResult.max(result, input) + } + + override def getAggregated(): T = { + result + } + +} + +// Numeric doesn't have min value +class TinyIntMaxAggregate extends MaxAggregate[Byte] { + + override def initiateAggregate: Unit = { + result = Byte.MinValue + } + +} + +class SmallIntMaxAggregate extends MaxAggregate[Short] { + + override def initiateAggregate: Unit = { + result = Short.MinValue + } + +} + +class IntMaxAggregate extends MaxAggregate[Int] { + + override def initiateAggregate: Unit = { + result = Int.MinValue + } + +} + +class LongMaxAggregate extends MaxAggregate[Long] { + + override def initiateAggregate: Unit = { + result = Long.MinValue + } + +} + +class FloatMaxAggregate extends MaxAggregate[Float] { + + override def initiateAggregate: Unit = { + result = Float.MinValue + } + +} + +class DoubleMaxAggregate extends MaxAggregate[Double] { + + override def initiateAggregate: Unit = { + result = Double.MinValue + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala new file mode 100644 index 0000000..e024bb4 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import scala.reflect.runtime.universe._ + +abstract class MinAggregate[T: Numeric] extends Aggregate[T] { + + var result: T = _ + val numericResult = implicitly[Numeric[T]] + + override def aggregate(value: Any): Unit = { + val input: T = value.asInstanceOf[T] + + result = numericResult.min(result, input) + } + + override def getAggregated(): T = { + result + } + +} + +// Numeric doesn't have max value +class TinyMinAggregate extends MinAggregate[Byte] { + + override def initiateAggregate: Unit = { + result = Byte.MaxValue + } + +} + +class SmallMinAggregate extends MinAggregate[Short] { + + override def initiateAggregate: Unit = { + result = Short.MaxValue + } + +} + +class IntMinAggregate extends MinAggregate[Int] { + + override def initiateAggregate: Unit = { + result = Int.MaxValue + } + +} + +class LongMinAggregate extends MinAggregate[Long] { + + override def initiateAggregate: Unit = { + result = Long.MaxValue + } + +} + +class FloatMinAggregate extends MinAggregate[Float] { + + override def initiateAggregate: Unit = { + result = Float.MaxValue + } + +} + +class DoubleMinAggregate extends MinAggregate[Double] { + + override def initiateAggregate: Unit = { + result = Double.MaxValue + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala new file mode 100644 index 0000000..84e1ae7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +class SumAggregate[T: Numeric] extends Aggregate[T] { + + private var result: T = _ + val numericResult = implicitly[Numeric[T]] + /** + * Initialize the aggregate state. + */ + override def initiateAggregate: Unit = { + result = implicitly[Numeric[T]].zero + } + + /** + * Feed the aggregate field value. + * + * @param value + */ + override def aggregate(value: Any): Unit = { + val input: T = value.asInstanceOf[T] + + result = numericResult.plus(result, input.asInstanceOf[T]) + } + + /** + * Return final aggregated value. + * + * @return + */ + override def getAggregated(): T = { + result + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java index dd51b14..8e81893 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -50,6 +50,7 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; + import scala.NotImplementedError; import java.util.List; @@ -61,7 +62,8 @@ public class AggregationsITCase extends MultipleProgramsTestBase { super(mode); } - @Test(expected = NotImplementedError.class) + @Ignore //DataSetMap needs to be implemented + @Test public void testAggregationTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -93,7 +95,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testWorkingAggregationDataTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -103,8 +105,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao")); - Table table = - tableEnv.fromDataSet(input); + Table table = tableEnv.fromDataSet(input); Table result = table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count"); @@ -115,6 +116,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } + @Ignore // it seems like the arithmetic expression is added to the field position @Test(expected = NotImplementedError.class) public void testAggregationWithArithmetic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -138,7 +140,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testAggregationWithTwoCount() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -199,6 +201,5 @@ public class AggregationsITCase extends MultipleProgramsTestBase { String expected = ""; compareResultAsText(results, expected); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java index 51f666e..222f161 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java @@ -27,11 +27,9 @@ import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.test.TableProgramsTestBase; -import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; import java.util.List; http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java index f48be48..b8ca4cd 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java @@ -26,11 +26,9 @@ import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.test.TableProgramsTestBase; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; import java.util.List; http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java index 524dd4e..910f601 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java @@ -59,7 +59,7 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testGroupedAggregate() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -78,7 +78,7 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testGroupingKeyForwardIfNotUsed() throws Exception { // the grouping key needs to be forwarded to the intermediate DataSet, even @@ -101,7 +101,7 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testGroupNoAggregation() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java index ada0e06..c4ac138 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java @@ -26,11 +26,9 @@ import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.test.TableProgramsTestBase; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; import java.util.List; http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java index ec4cd1c..8876dc8 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java @@ -23,7 +23,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.table.ExpressionException; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.Table; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; http://git-wip-us.apache.org/repos/asf/flink/blob/7972426e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala index 76bdcba..64f6757 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala @@ -27,13 +27,13 @@ import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized - import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Test(expected = classOf[NotImplementedError]) + @Ignore //DataSetMap needs to be implemented + @Test def testAggregationTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -57,7 +57,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testWorkingAggregationDataTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -71,6 +71,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Ignore // it seems like the arithmetic expression is added to the field position @Test(expected = classOf[NotImplementedError]) def testAggregationWithArithmetic(): Unit = { @@ -83,7 +84,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testAggregationWithTwoCount(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -120,7 +121,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testSQLStyleAggregations(): Unit = { // the grouping key needs to be forwarded to the intermediate DataSet, even