Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AEAD81781D for ; Fri, 3 Apr 2015 08:34:55 +0000 (UTC) Received: (qmail 18279 invoked by uid 500); 3 Apr 2015 08:34:55 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 18234 invoked by uid 500); 3 Apr 2015 08:34:55 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 18178 invoked by uid 99); 3 Apr 2015 08:34:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Apr 2015 08:34:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5A349E0B2B; Fri, 3 Apr 2015 08:34:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Fri, 03 Apr 2015 08:34:56 -0000 Message-Id: <30ff01db287d43acb79c187d32bf7fae@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] flink git commit: [FLINK-1788] [table] Make logical plans transformable http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala index b0e2d05..ba376f5 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table.runtime import org.apache.flink.api.table.codegen.GenerateUnaryPredicate -import org.apache.flink.api.table.tree.{NopExpression, Expression} +import org.apache.flink.api.table.expressions.{NopExpression, Expression} import org.apache.flink.api.common.functions.RichFilterFunction import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.configuration.Configuration http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala index f0f5636..f5616d3 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.api.table.runtime -import org.apache.flink.api.table.tree.{NopExpression, Expression} +import org.apache.flink.api.table.expressions.{NopExpression, Expression} import org.apache.flink.api.common.functions.RichFlatJoinFunction import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.table.codegen.{GenerateBinaryResultAssembler, http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala index 0a2830b..16e256a 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.api.table.runtime -import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.table.codegen.GenerateUnaryResultAssembler http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala deleted file mode 100644 index 6302572..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.tree - -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation} - -import scala.language.postfixOps - - -abstract class Expression extends Product { - def children: Seq[Expression] - def name: String = Expression.freshName("expression") - def typeInfo: TypeInformation[_] - - /** - * Tests for equality by first testing for reference equality. - */ - def fastEquals(other: Expression): Boolean = this.eq(other) || this == other - - def transformPre(rule: PartialFunction[Expression, Expression]): Expression = { - val afterTransform = rule.applyOrElse(this, identity[Expression]) - - if (afterTransform fastEquals this) { - this.transformChildrenPre(rule) - } else { - afterTransform.transformChildrenPre(rule) - } - } - - def transformChildrenPre(rule: PartialFunction[Expression, Expression]): Expression = { - var changed = false - val newArgs = productIterator map { - case child: Expression if children.contains(child) => - val newChild = child.transformPre(rule) - if (newChild fastEquals child) { - child - } else { - changed = true - newChild - } - case other: AnyRef => other - case null => null - } toArray - - if (changed) makeCopy(newArgs) else this - } - - def transformPost(rule: PartialFunction[Expression, Expression]): Expression = { - val afterChildren = transformChildrenPost(rule) - if (afterChildren fastEquals this) { - rule.applyOrElse(this, identity[Expression]) - } else { - rule.applyOrElse(afterChildren, identity[Expression]) - } - } - - def transformChildrenPost(rule: PartialFunction[Expression, Expression]): Expression = { - var changed = false - val newArgs = productIterator map { - case child: Expression if children.contains(child) => - val newChild = child.transformPost(rule) - if (newChild fastEquals child) { - child - } else { - changed = true - newChild - } - case other: AnyRef => other - case null => null - } toArray - // toArray forces evaluation, toSeq does not seem to work here - - if (changed) makeCopy(newArgs) else this - } - - def exists(predicate: Expression => Boolean): Boolean = { - var exists = false - this.transformPre { - case e: Expression => if (predicate(e)) { - exists = true - } - e - } - exists - } - - /** - * Creates a new copy of this expression with new children. This is used during transformation - * if children change. This must be overridden by Expressions that don't have the Constructor - * arguments in the same order as the `children`. - */ - def makeCopy(newArgs: Seq[AnyRef]): this.type = { - val defaultCtor = - this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head - try { - defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type] - } catch { - case iae: IllegalArgumentException => - println("IAE " + this) - throw new RuntimeException("Should never happen.") - } - } -} - -abstract class BinaryExpression() extends Expression { - def left: Expression - def right: Expression - def children = Seq(left, right) -} - -abstract class UnaryExpression() extends Expression { - def child: Expression - def children = Seq(child) -} - -abstract class LeafExpression() extends Expression { - val children = Nil -} - -case class NopExpression() extends LeafExpression { - val typeInfo = new NothingTypeInfo() - override val name = Expression.freshName("nop") - -} - -object Expression { - def freshName(prefix: String): String = { - s"$prefix-${freshNameCounter.getAndIncrement}" - } - - val freshNameCounter = new AtomicInteger -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala deleted file mode 100644 index e5cdac5..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.tree - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.java.aggregation.Aggregations - - -abstract sealed class Aggregation extends UnaryExpression { - def typeInfo = { - child.typeInfo match { - case BasicTypeInfo.LONG_TYPE_INFO => // ok - case BasicTypeInfo.INT_TYPE_INFO => - case BasicTypeInfo.DOUBLE_TYPE_INFO => - case BasicTypeInfo.FLOAT_TYPE_INFO => - case BasicTypeInfo.BYTE_TYPE_INFO => - case BasicTypeInfo.SHORT_TYPE_INFO => - case _ => - throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " + - s"aggregation $this. Only numeric data types supported.") - } - child.typeInfo - } - - override def toString = s"Aggregate($child)" - - def getIntermediateFields: Seq[Expression] - def getFinalField(inputs: Seq[Expression]): Expression - def getAggregations: Seq[Aggregations] -} - -case class Sum(child: Expression) extends Aggregation { - override def toString = s"($child).sum" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.SUM) -} - -case class Min(child: Expression) extends Aggregation { - override def toString = s"($child).min" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.MIN) - -} - -case class Max(child: Expression) extends Aggregation { - override def toString = s"($child).max" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.MAX) -} - -case class Count(child: Expression) extends Aggregation { - override def typeInfo = { - child.typeInfo match { - case _ => // we can count anything... :D - } - BasicTypeInfo.INT_TYPE_INFO - } - - override def toString = s"($child).count" - - override def getIntermediateFields: Seq[Expression] = Seq(Literal(Integer.valueOf(1))) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.SUM) - -} - -case class Avg(child: Expression) extends Aggregation { - override def toString = s"($child).avg" - - override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1)) - // This is just sweet. Use our own AST representation and let the code generator do - // our dirty work. - override def getFinalField(inputs: Seq[Expression]): Expression = - Div(inputs(0), inputs(1)) - override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala deleted file mode 100644 index 84f9b18..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.tree - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation} - -abstract class BinaryArithmetic extends BinaryExpression { - def typeInfo = { - if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""") - } - if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - left.typeInfo - } -} - -case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { - override def typeInfo = { - if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] && - !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { - throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this") - } - if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] && - !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { - throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - left.typeInfo - } - - override def toString = s"($left + $right)" -} - -case class UnaryMinus(child: Expression) extends UnaryExpression { - def typeInfo = { - if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""") - } - child.typeInfo - } - - override def toString = s"-($child)" -} - -case class Minus(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left - $right)" -} - -case class Div(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left / $right)" -} - -case class Mul(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left * $right)" -} - -case class Mod(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left * $right)" -} - -case class Abs(child: Expression) extends UnaryExpression { - def typeInfo = child.typeInfo - - override def toString = s"abs($child)" -} - -abstract class BitwiseBinaryArithmetic extends BinaryExpression { - def typeInfo: TypeInformation[_] = { - if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""") - } - if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { - left.typeInfo - } else { - BasicTypeInfo.INT_TYPE_INFO - } - } -} - -case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { - override def toString = s"($left & $right)" -} - -case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { - override def toString = s"($left | $right)" -} - - -case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { - override def toString = s"($left ^ $right)" -} - -case class BitwiseNot(child: Expression) extends UnaryExpression { - def typeInfo: TypeInformation[_] = { - if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""") - } - if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { - child.typeInfo - } else { - BasicTypeInfo.INT_TYPE_INFO - } - } - - override def toString = s"~($child)" -} - http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala deleted file mode 100644 index a3acc35..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.tree - -import org.apache.flink.api.common.typeinfo.TypeInformation - -case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression { - def typeInfo = tpe -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala deleted file mode 100644 index e0a34a9..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.tree - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo} - -abstract class BinaryComparison extends BinaryExpression { - def typeInfo = { - if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException(s"Non-numeric operand ${left} in $this") - } - if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException(s"Non-numeric operand ${right} in $this") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } -} - -case class EqualTo(left: Expression, right: Expression) extends BinaryComparison { - override def typeInfo = { - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"$left === $right" -} - -case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison { - override def typeInfo = { - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"$left !== $right" -} - -case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left > $right" -} - -case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left >= $right" -} - -case class LessThan(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left < $right" -} - -case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left <= $right" -} - -case class IsNull(child: Expression) extends UnaryExpression { - def typeInfo = { - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"($child).isNull" -} - -case class IsNotNull(child: Expression) extends UnaryExpression { - def typeInfo = { - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"($child).isNotNull" -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala deleted file mode 100644 index cc42148..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.tree - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.TypeInformation - -case class UnresolvedFieldReference(override val name: String) extends LeafExpression { - def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this") - - override def toString = "\"" + name -} - -case class ResolvedFieldReference( - override val name: String, - tpe: TypeInformation[_]) extends LeafExpression { - def typeInfo = tpe - - override def toString = s"'$name" -} - -case class Naming(child: Expression, override val name: String) extends UnaryExpression { - def typeInfo = child.typeInfo - - override def toString = s"$child as '$name" -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala deleted file mode 100644 index 852d5a1..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.tree - -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.scala.table.ImplicitExpressionOperations - -object Literal { - def apply(l: Any): Literal = l match { - case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO) - case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO) - case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO) - case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO) - case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO) - case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO) - } -} - -case class Literal(value: Any, tpe: TypeInformation[_]) - extends LeafExpression with ImplicitExpressionOperations { - def expr = this - def typeInfo = tpe - - override def toString = s"$value" -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala deleted file mode 100644 index 8ab838d..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.tree - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.BasicTypeInfo - -abstract class BinaryPredicate extends BinaryExpression { - def typeInfo = { - if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO || - right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { - throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } -} - -case class Not(child: Expression) extends UnaryExpression { - def typeInfo = { - if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { - throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override val name = Expression.freshName("not-" + child.name) - - override def toString = s"!($child)" -} - -case class And(left: Expression, right: Expression) extends BinaryPredicate { - override def toString = s"$left && $right" - - override val name = Expression.freshName(left.name + "-and-" + right.name) -} - -case class Or(left: Expression, right: Expression) extends BinaryPredicate { - override def toString = s"$left || $right" - - override val name = Expression.freshName(left.name + "-or-" + right.name) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala deleted file mode 100644 index caac402..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * This package contains the base class of AST nodes and all the expression language AST classes. - * Expression trees should not be manually constructed by users. They are implicitly constructed - * from the implicit DSL conversions in - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API, - * expression trees should be generated from a string parser that parses expressions and creates - * AST nodes. - */ -package object tree http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala deleted file mode 100644 index e14374f..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.tree - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo} - -case class Substring( - str: Expression, - beginIndex: Expression, - endIndex: Expression) extends Expression { - def typeInfo = { - if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) { - throw new ExpressionException( - s"""Operand must be of type String in $this, is ${str.typeInfo}.""") - } - if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""") - } - if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""") - } - - BasicTypeInfo.STRING_TYPE_INFO - } - - override def children: Seq[Expression] = Seq(str, beginIndex, endIndex) - override def toString = s"($str).substring($beginIndex, $endIndex)" -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala new file mode 100644 index 0000000..87051cf --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.trees + +/** + * Base class for tree analyzers/transformers. Analyzers must implement method `rules` to + * provide the chain of rules that are invoked one after another. The tree resulting + * from one rule is fed into the next rule and the final result is returned from method `analyze`. + */ +abstract class Analyzer[A <: TreeNode[A]] { + + def rules: Seq[Rule[A]] + + final def analyze(expr: A): A = { + var currentTree = expr + for (rule <- rules) { + var running = true + while (running) { + val newTree = rule(currentTree) + if (newTree fastEquals currentTree) { + running = false + } + currentTree = newTree + } + } + currentTree + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala new file mode 100644 index 0000000..b8a27cb --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.trees + +/** + * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets a tree + * and must return a tree. The returned tree can also be the input tree. In an [[Analyzer]] + * rule chain the result tree of one [[Rule]] is fed into the next [[Rule]] in the chain. + * + * A [[Rule]] is repeatedly applied to a tree until the tree does not change between + * rule applications. + */ +abstract class Rule[A <: TreeNode[A]] { + def apply(expr: A): A +} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala new file mode 100644 index 0000000..84f1d7e --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.trees + +/** + * Generic base class for trees that can be transformed and traversed. + */ +abstract class TreeNode[A <: TreeNode[A]] { self: A with Product => + + /** + * List of child nodes that should be considered when doing transformations. Other values + * in the Product will not be transformed, only handed through. + */ + def children: Seq[A] + + /** + * Tests for equality by first testing for reference equality. + */ + def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other + + def transformPre(rule: PartialFunction[A, A]): A = { + val afterTransform = rule.applyOrElse(this, identity[A]) + + if (afterTransform fastEquals this) { + this.transformChildrenPre(rule) + } else { + afterTransform.transformChildrenPre(rule) + } + } + + def transformChildrenPre(rule: PartialFunction[A, A]): A = { + var changed = false + val newArgs = productIterator map { + case child: A if children.contains(child) => + val newChild = child.transformPre(rule) + if (newChild fastEquals child) { + child + } else { + changed = true + newChild + } + case other: AnyRef => other + case null => null + } toArray + + if (changed) makeCopy(newArgs) else this + } + + def transformPost(rule: PartialFunction[A, A]): A = { + val afterChildren = transformChildrenPost(rule) + if (afterChildren fastEquals this) { + rule.applyOrElse(this, identity[A]) + } else { + rule.applyOrElse(afterChildren, identity[A]) + } + } + + def transformChildrenPost(rule: PartialFunction[A, A]): A = { + var changed = false + val newArgs = productIterator map { + case child: A if children.contains(child) => + val newChild = child.transformPost(rule) + if (newChild fastEquals child) { + child + } else { + changed = true + newChild + } + case other: AnyRef => other + case null => null + } toArray + // toArray forces evaluation, toSeq does not seem to work here + + if (changed) makeCopy(newArgs) else this + } + + def exists(predicate: A => Boolean): Boolean = { + var exists = false + this.transformPre { + case e: A => if (predicate(e)) { + exists = true + } + e + } + exists + } + + /** + * Creates a new copy of this expression with new children. This is used during transformation + * if children change. This must be overridden by tree nodes that don't have the Constructor + * arguments in the same order as the `children`. + */ + def makeCopy(newArgs: Seq[AnyRef]): this.type = { + val defaultCtor = + this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head + try { + defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type] + } catch { + case iae: IllegalArgumentException => + println("IAE " + this) + throw new RuntimeException("Should never happen.") + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala index 7ffa91c..db3c881 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala @@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.table.Row import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo} /** http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala deleted file mode 100644 index 604bdcf..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala - -import org.apache.flink.api.table.tree.Literal -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.examples.java.graph.util.PageRankData -import org.apache.flink.util.Collector - -import _root_.scala.collection.JavaConverters._ - -/** -* A basic implementation of the Page Rank algorithm using a bulk iteration. -* -* This implementation requires a set of pages and a set of directed links as input and works as -* follows. -* -* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each -* page collects the partial ranks of all pages that point to it, sums them up, and applies a -* dampening factor to the sum. The result is the new rank of the page. A new iteration is started -* with the new ranks of all pages. This implementation terminates after a fixed number of -* iterations. This is the Wikipedia entry for the -* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]] -* -* Input files are plain text files and must be formatted as follows: -* -* - Pages represented as an (long) ID separated by new-line characters. -* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63. -* - Links are represented as pairs of page IDs which are separated by space characters. Links -* are separated by new-line characters. -* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12), -* (1)->(12), and (42)->(63). For this simple implementation it is required that each page has -* at least one incoming and one outgoing link (a page can point to itself). -* -* Usage: -* {{{ -* PageRankBasic -* }}} -* -* If no parameters are provided, the program is run with default data from -* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. -* -* This example shows how to use: -* -* - Bulk Iterations -* - Table API expressions -*/ -object PageRankExpression { - - private final val DAMPENING_FACTOR: Double = 0.85 - private final val EPSILON: Double = 0.0001 - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // read input data - val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) } - .as('pageId, 'rank) - - val links = getLinksDataSet(env) - - // build adjacency list from link input - val adjacencyLists = links - .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] { - - override def reduce( - values: _root_.java.lang.Iterable[Link], - out: Collector[AdjacencyList]): Unit = { - var outputId = -1L - val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId } - out.collect(new AdjacencyList(outputId, outputList.toArray)) - } - - }).as('sourceId, 'targetIds) - - // start iteration - val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { - currentRanks => - val newRanks = currentRanks.toTable - // distribute ranks to target pages - .join(adjacencyLists).where('pageId === 'sourceId) - .select('rank, 'targetIds).as[RankOutput] - .flatMap { - (in, out: Collector[(Long, Double)]) => - val targets = in.targetIds - val len = targets.length - targets foreach { t => out.collect((t, in.rank / len )) } - } - .as('pageId, 'rank) - // collect ranks and sum them up - .groupBy('pageId).select('pageId, 'rank.sum as 'rank) - // apply dampening factor - .select( - 'pageId, - ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank) - - - val termination = currentRanks.toTable - .as('curId, 'curRank).join(newRanks.as('newId, 'newRank)) - .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON) - - (newRanks, termination) - } - - val result = finalRanks - - // emit result - if (fileOutput) { - result.writeAsCsv(outputPath, "\n", " ") - } else { - result.print() - } - - // execute program - env.execute("Expression PageRank Example") - } - - // ************************************************************************* - // USER TYPES - // ************************************************************************* - - case class Link(sourceId: Long, targetId: Long) - - case class Page(pageId: Long, rank: Double) - - case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) - - case class RankOutput(rank: Double, targetIds: Array[Long]) - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 5) { - pagesInputPath = args(0) - linksInputPath = args(1) - outputPath = args(2) - numPages = args(3).toLong - maxIterations = args(4).toInt - } else { - System.err.println("Usage: PageRankBasic ") - false - } - } else { - System.out.println("Executing PageRank Basic example with default parameters and built-in " + - "default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" Usage: PageRankBasic ") - - numPages = PageRankData.getNumberOfPages - } - true - } - - private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { - if (fileOutput) { - env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") - .map(x => x._1) - } else { - env.generateSequence(1, 15) - } - } - - private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { - if (fileOutput) { - env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", - includedFields = Array(0, 1)) - } else { - val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], - v2.asInstanceOf[Long])} - env.fromCollection(edges) - } - } - - private var fileOutput: Boolean = false - private var pagesInputPath: String = null - private var linksInputPath: String = null - private var outputPath: String = null - private var numPages: Double = 0 - private var maxIterations: Int = 10 - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala new file mode 100644 index 0000000..c0e0f6c --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.examples.java.graph.util.PageRankData +import org.apache.flink.util.Collector + +import _root_.scala.collection.JavaConverters._ + +/** +* A basic implementation of the Page Rank algorithm using a bulk iteration. +* +* This implementation requires a set of pages and a set of directed links as input and works as +* follows. +* +* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each +* page collects the partial ranks of all pages that point to it, sums them up, and applies a +* dampening factor to the sum. The result is the new rank of the page. A new iteration is started +* with the new ranks of all pages. This implementation terminates after a fixed number of +* iterations. This is the Wikipedia entry for the +* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]] +* +* Input files are plain text files and must be formatted as follows: +* +* - Pages represented as an (long) ID separated by new-line characters. +* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63. +* - Links are represented as pairs of page IDs which are separated by space characters. Links +* are separated by new-line characters. +* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12), +* (1)->(12), and (42)->(63). For this simple implementation it is required that each page has +* at least one incoming and one outgoing link (a page can point to itself). +* +* Usage: +* {{{ +* PageRankBasic +* }}} +* +* If no parameters are provided, the program is run with default data from +* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. +* +* This example shows how to use: +* +* - Bulk Iterations +* - Table API expressions +*/ +object PageRankTable { + + private final val DAMPENING_FACTOR: Double = 0.85 + private final val EPSILON: Double = 0.0001 + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // read input data + val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) } + .as('pageId, 'rank) + + val links = getLinksDataSet(env) + + // build adjacency list from link input + val adjacencyLists = links + .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] { + + override def reduce( + values: _root_.java.lang.Iterable[Link], + out: Collector[AdjacencyList]): Unit = { + var outputId = -1L + val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId } + out.collect(new AdjacencyList(outputId, outputList.toArray)) + } + + }).as('sourceId, 'targetIds) + + // start iteration + val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { + currentRanks => + val newRanks = currentRanks.toTable + // distribute ranks to target pages + .join(adjacencyLists).where('pageId === 'sourceId) + .select('rank, 'targetIds).toSet[RankOutput] + .flatMap { + (in, out: Collector[(Long, Double)]) => + val targets = in.targetIds + val len = targets.length + targets foreach { t => out.collect((t, in.rank / len )) } + } + .as('pageId, 'rank) + // collect ranks and sum them up + .groupBy('pageId).select('pageId, 'rank.sum as 'rank) + // apply dampening factor + .select( + 'pageId, + ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank) + + + val termination = currentRanks.toTable + .as('curId, 'curRank).join(newRanks.as('newId, 'newRank)) + .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON) + + (newRanks, termination) + } + + val result = finalRanks + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", " ") + } else { + result.print() + } + + // execute program + env.execute("Expression PageRank Example") + } + + // ************************************************************************* + // USER TYPES + // ************************************************************************* + + case class Link(sourceId: Long, targetId: Long) + + case class Page(pageId: Long, rank: Double) + + case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) + + case class RankOutput(rank: Double, targetIds: Array[Long]) + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 5) { + pagesInputPath = args(0) + linksInputPath = args(1) + outputPath = args(2) + numPages = args(3).toLong + maxIterations = args(4).toInt + } else { + System.err.println("Usage: PageRankBasic ") + false + } + } else { + System.out.println("Executing PageRank Basic example with default parameters and built-in " + + "default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: PageRankBasic ") + + numPages = PageRankData.getNumberOfPages + } + true + } + + private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { + if (fileOutput) { + env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") + .map(x => x._1) + } else { + env.generateSequence(1, 15) + } + } + + private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { + if (fileOutput) { + env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", + includedFields = Array(0, 1)) + } else { + val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], + v2.asInstanceOf[Long])} + env.fromCollection(edges) + } + } + + private var fileOutput: Boolean = false + private var pagesInputPath: String = null + private var linksInputPath: String = null + private var outputPath: String = null + private var numPages: Double = 0 + private var maxIterations: Int = 10 + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala deleted file mode 100644 index 0ff97bf..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.scala - -import org.apache.flink.streaming.api.scala._ - -import org.apache.flink.api.scala.table._ - -import scala.Stream._ -import scala.math._ -import scala.language.postfixOps -import scala.util.Random - -/** - * Simple example for demonstrating the use of the Table API with Flink Streaming. - */ -object StreamingExpressionFilter { - - case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val cars = genCarStream().toTable - .filter('carId === 0) - .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time) - .as[CarEvent] - - cars.print() - - StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing") - - } - - def genCarStream(): DataStream[CarEvent] = { - - def nextSpeed(carEvent : CarEvent) : CarEvent = - { - val next = - if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) - CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) - } - def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = - { - Thread.sleep(1000) - speeds.append(carStream(speeds.map(nextSpeed))) - } - carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) - } - - def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - if (args.length == 3) { - numOfCars = args(0).toInt - evictionSec = args(1).toInt - triggerMeters = args(2).toDouble - true - } - else { - System.err.println("Usage: TopSpeedWindowing ") - false - } - }else{ - true - } - } - - var numOfCars = 2 - var evictionSec = 10 - var triggerMeters = 50d - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala new file mode 100644 index 0000000..4aa5653 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala + +import org.apache.flink.streaming.api.scala._ + +import org.apache.flink.api.scala.table._ + +import scala.Stream._ +import scala.math._ +import scala.language.postfixOps +import scala.util.Random + +/** + * Simple example for demonstrating the use of the Table API with Flink Streaming. + */ +object StreamingTableFilter { + + case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val cars = genCarStream().toTable + .filter('carId === 0) + .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time) + .toStream[CarEvent] + + cars.print() + + StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing") + + } + + def genCarStream(): DataStream[CarEvent] = { + + def nextSpeed(carEvent : CarEvent) : CarEvent = + { + val next = + if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) + CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) + } + def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = + { + Thread.sleep(1000) + speeds.append(carStream(speeds.map(nextSpeed))) + } + carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) + } + + def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + if (args.length == 3) { + numOfCars = args(0).toInt + evictionSec = args(1).toInt + triggerMeters = args(2).toDouble + true + } + else { + System.err.println("Usage: TopSpeedWindowing ") + false + } + }else{ + true + } + } + + var numOfCars = 2 + var evictionSec = 10 + var triggerMeters = 50d + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala deleted file mode 100644 index 96ec4ba..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala - -import org.apache.flink.api.table.tree.Literal -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ - -/** - * This program implements a modified version of the TPC-H query 3. The - * example demonstrates how to assign names to fields by extending the Tuple class. - * The original query can be found at - * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) - * (page 29). - * - * This program implements the following SQL equivalent: - * - * {{{ - * SELECT - * l_orderkey, - * SUM(l_extendedprice*(1-l_discount)) AS revenue, - * o_orderdate, - * o_shippriority - * FROM customer, - * orders, - * lineitem - * WHERE - * c_mktsegment = '[SEGMENT]' - * AND c_custkey = o_custkey - * AND l_orderkey = o_orderkey - * AND o_orderdate < date '[DATE]' - * AND l_shipdate > date '[DATE]' - * GROUP BY - * l_orderkey, - * o_orderdate, - * o_shippriority; - * }}} - * - * Compared to the original TPC-H query this version does not sort the result by revenue - * and orderdate. - * - * Input files are plain text CSV files using the pipe character ('|') as field separator - * as generated by the TPC-H data generator which is available at - * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). - * - * Usage: - * {{{ - * TPCHQuery3Expression - * }}} - * - * This example shows how to use: - * - Table API expressions - * - */ -object TPCHQuery3Expression { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set filter date - val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd") - val date = dateFormat.parse("1995-03-12") - - // get execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - val lineitems = getLineitemDataSet(env) - .filter( l => dateFormat.parse(l.shipDate).after(date) ) - .as('id, 'extdPrice, 'discount, 'shipDate) - - val customers = getCustomerDataSet(env) - .as('id, 'mktSegment) - .filter( 'mktSegment === "AUTOMOBILE" ) - - val orders = getOrdersDataSet(env) - .filter( o => dateFormat.parse(o.orderDate).before(date) ) - .as('orderId, 'custId, 'orderDate, 'shipPrio) - - val items = - orders.join(customers) - .where('custId === 'id) - .select('orderId, 'orderDate, 'shipPrio) - .join(lineitems) - .where('orderId === 'id) - .select( - 'orderId, - 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue, - 'orderDate, - 'shipPrio) - - val result = items - .groupBy('orderId, 'orderDate, 'shipPrio) - .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio) - - // emit result - result.writeAsCsv(outputPath, "\n", "|") - - // execute program - env.execute("Scala TPCH Query 3 (Expression) Example") - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String) - case class Customer(id: Long, mktSegment: String) - case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private var lineitemPath: String = null - private var customerPath: String = null - private var ordersPath: String = null - private var outputPath: String = null - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length == 4) { - lineitemPath = args(0) - customerPath = args(1) - ordersPath = args(2) - outputPath = args(3) - true - } else { - System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + - " Due to legal restrictions, we can not ship generated data.\n" + - " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + - " Usage: TPCHQuery3 " + - " "); - false - } - } - - private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { - env.readCsvFile[Lineitem]( - lineitemPath, - fieldDelimiter = "|", - includedFields = Array(0, 5, 6, 10) ) - } - - private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { - env.readCsvFile[Customer]( - customerPath, - fieldDelimiter = "|", - includedFields = Array(0, 6) ) - } - - private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { - env.readCsvFile[Order]( - ordersPath, - fieldDelimiter = "|", - includedFields = Array(0, 1, 4, 7) ) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala new file mode 100644 index 0000000..f527a3c --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ + +/** + * This program implements a modified version of the TPC-H query 3. The + * example demonstrates how to assign names to fields by extending the Tuple class. + * The original query can be found at + * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) + * (page 29). + * + * This program implements the following SQL equivalent: + * + * {{{ + * SELECT + * l_orderkey, + * SUM(l_extendedprice*(1-l_discount)) AS revenue, + * o_orderdate, + * o_shippriority + * FROM customer, + * orders, + * lineitem + * WHERE + * c_mktsegment = '[SEGMENT]' + * AND c_custkey = o_custkey + * AND l_orderkey = o_orderkey + * AND o_orderdate < date '[DATE]' + * AND l_shipdate > date '[DATE]' + * GROUP BY + * l_orderkey, + * o_orderdate, + * o_shippriority; + * }}} + * + * Compared to the original TPC-H query this version does not sort the result by revenue + * and orderdate. + * + * Input files are plain text CSV files using the pipe character ('|') as field separator + * as generated by the TPC-H data generator which is available at + * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). + * + * Usage: + * {{{ + * TPCHQuery3Expression + * }}} + * + * This example shows how to use: + * - Table API expressions + * + */ +object TPCHQuery3Table { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set filter date + val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd") + val date = dateFormat.parse("1995-03-12") + + // get execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + val lineitems = getLineitemDataSet(env) + .filter( l => dateFormat.parse(l.shipDate).after(date) ) + .as('id, 'extdPrice, 'discount, 'shipDate) + + val customers = getCustomerDataSet(env) + .as('id, 'mktSegment) + .filter( 'mktSegment === "AUTOMOBILE" ) + + val orders = getOrdersDataSet(env) + .filter( o => dateFormat.parse(o.orderDate).before(date) ) + .as('orderId, 'custId, 'orderDate, 'shipPrio) + + val items = + orders.join(customers) + .where('custId === 'id) + .select('orderId, 'orderDate, 'shipPrio) + .join(lineitems) + .where('orderId === 'id) + .select( + 'orderId, + 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue, + 'orderDate, + 'shipPrio) + + val result = items + .groupBy('orderId, 'orderDate, 'shipPrio) + .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio) + + // emit result + result.writeAsCsv(outputPath, "\n", "|") + + // execute program + env.execute("Scala TPCH Query 3 (Expression) Example") + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String) + case class Customer(id: Long, mktSegment: String) + case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private var lineitemPath: String = null + private var customerPath: String = null + private var ordersPath: String = null + private var outputPath: String = null + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length == 4) { + lineitemPath = args(0) + customerPath = args(1) + ordersPath = args(2) + outputPath = args(3) + true + } else { + System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + + " Due to legal restrictions, we can not ship generated data.\n" + + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + + " Usage: TPCHQuery3 " + + " "); + false + } + } + + private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { + env.readCsvFile[Lineitem]( + lineitemPath, + fieldDelimiter = "|", + includedFields = Array(0, 5, 6, 10) ) + } + + private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { + env.readCsvFile[Customer]( + customerPath, + fieldDelimiter = "|", + includedFields = Array(0, 6) ) + } + + private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { + env.readCsvFile[Order]( + ordersPath, + fieldDelimiter = "|", + includedFields = Array(0, 1, 4, 7) ) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java index 0b2a5df..60fb984 100644 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -85,10 +85,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); - Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env)); + Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env)); - Table result = - table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg"); + Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg"); DataSet ds = tableEnv.toSet(result, Row.class); ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); @@ -103,10 +102,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); - Table table = + Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env)); - Table result = + Table result = table.select("foo.avg"); DataSet ds = tableEnv.toSet(result, Row.class); @@ -127,10 +126,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase { new Tuple7((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), new Tuple7((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao")); - Table table = + Table table = tableEnv.toTable(input); - Table result = + Table result = table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count"); DataSet ds = tableEnv.toSet(result, Row.class); @@ -151,10 +150,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase { new Tuple2(1f, "Hello"), new Tuple2(2f, "Ciao")); - Table table = + Table table = tableEnv.toTable(input); - Table result = + Table result = table.select("(f0 + 2).avg + 2, f1.count + \" THE COUNT\""); @@ -174,10 +173,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase { DataSource> input = env.fromElements(new Tuple2(1f, "Hello")); - Table table = + Table table = tableEnv.toTable(input); - Table result = + Table result = table.select("f1.sum"); @@ -196,10 +195,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase { DataSource> input = env.fromElements(new Tuple2(1f, "Hello")); - Table table = + Table table = tableEnv.toTable(input); - Table result = + Table result = table.select("f0.sum.sum"); http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java index ee877e9..6ec3187 100644 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java @@ -65,7 +65,7 @@ public class AsITCase extends MultipleProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); - Table table = + Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, c"); DataSet ds = tableEnv.toSet(table, Row.class); @@ -86,7 +86,7 @@ public class AsITCase extends MultipleProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); - Table table = + Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b"); DataSet ds = tableEnv.toSet(table, Row.class); @@ -102,7 +102,7 @@ public class AsITCase extends MultipleProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); - Table table = + Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d"); DataSet ds = tableEnv.toSet(table, Row.class); @@ -118,7 +118,7 @@ public class AsITCase extends MultipleProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); - Table table = + Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, b"); DataSet ds = tableEnv.toSet(table, Row.class); @@ -134,7 +134,7 @@ public class AsITCase extends MultipleProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); - Table table = + Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c"); DataSet ds = tableEnv.toSet(table, Row.class); @@ -150,7 +150,7 @@ public class AsITCase extends MultipleProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); - Table table = + Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," + " c");