Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A016018345 for ; Thu, 25 Feb 2016 09:45:50 +0000 (UTC) Received: (qmail 79953 invoked by uid 500); 25 Feb 2016 09:45:28 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 79914 invoked by uid 500); 25 Feb 2016 09:45:28 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 79837 invoked by uid 99); 25 Feb 2016 09:45:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Feb 2016 09:45:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1B5E4E0446; Thu, 25 Feb 2016 09:45:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: twalthr@apache.org To: commits@flink.apache.org Date: Thu, 25 Feb 2016 09:45:28 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-3226] Translation of scalar function substring() Repository: flink Updated Branches: refs/heads/tableOnCalcite 20127ab71 -> 663101325 [FLINK-3226] Translation of scalar function substring() Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e220745 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e220745 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e220745 Branch: refs/heads/tableOnCalcite Commit: 3e22074553cedde1c3a1754d91d26a9f35ac3c2f Parents: 1c53c87 Author: twalthr Authored: Sat Feb 20 21:41:44 2016 +0100 Committer: twalthr Committed: Thu Feb 25 10:32:41 2016 +0100 ---------------------------------------------------------------------- .../flink/api/scala/table/expressionDsl.scala | 8 +- .../flink/api/table/codegen/CodeGenerator.scala | 19 +- .../api/table/codegen/OperatorCodeGen.scala | 462 ------------------ .../api/table/codegen/calls/CallGenerator.scala | 30 ++ .../codegen/calls/MethodCallGenerator.scala | 62 +++ .../table/codegen/calls/ScalarFunctions.scala | 72 +++ .../table/codegen/calls/ScalarOperators.scala | 463 +++++++++++++++++++ .../table/expressions/stringExpressions.scala | 21 +- .../api/table/parser/ExpressionParser.scala | 4 +- .../api/table/plan/RexNodeTranslator.scala | 35 +- .../flink/api/java/table/test/AsITCase.java | 4 +- .../api/java/table/test/CastingITCase.java | 2 +- .../api/java/table/test/ExpressionsITCase.java | 2 +- .../flink/api/java/table/test/FilterITCase.java | 2 +- .../flink/api/java/table/test/SelectITCase.java | 2 +- .../table/test/StringExpressionsITCase.java | 10 +- .../flink/api/scala/table/test/AsITCase.scala | 5 +- .../scala/table/test/ExpressionsITCase.scala | 5 +- .../api/scala/table/test/FilterITCase.scala | 5 +- .../api/scala/table/test/SelectITCase.scala | 5 +- .../table/test/StringExpressionsITCase.scala | 14 +- .../api/table/test/ScalarFunctionsTest.scala | 96 ++++ .../api/table/test/TableProgramsTestBase.scala | 97 ---- .../table/test/utils/ExpressionEvaluator.scala | 93 ++++ .../test/utils/TableProgramsTestBase.scala | 97 ++++ 25 files changed, 1006 insertions(+), 609 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 058ff0e..68914da 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -63,8 +63,12 @@ trait ImplicitExpressionOperations { def count = Count(expr) def avg = Avg(expr) - def substring(beginIndex: Expression, endIndex: Expression = Literal(Int.MaxValue)) = { - Substring(expr, beginIndex, endIndex) + def substring(beginIndex: Expression, endIndex: Expression) = { + Substring(expr, beginIndex, Some(endIndex)) + } + + def substring(beginIndex: Expression) = { + Substring(expr, beginIndex) } def cast(toType: TypeInformation[_]) = Cast(expr, toType) http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index add5627..a052618 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -19,9 +19,10 @@ package org.apache.flink.api.table.codegen import org.apache.calcite.rex._ +import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun.SqlStdOperatorTable._ -import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction} +import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction, Function, MapFunction} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} @@ -29,12 +30,13 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.table.TableConfig import org.apache.flink.api.table.codegen.CodeGenUtils._ import org.apache.flink.api.table.codegen.Indenter.toISC -import org.apache.flink.api.table.codegen.OperatorCodeGen._ +import org.apache.flink.api.table.codegen.calls.ScalarFunctions +import org.apache.flink.api.table.codegen.calls.ScalarOperators._ import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo import org.apache.flink.api.table.typeinfo.RowTypeInfo + import scala.collection.JavaConversions._ import scala.collection.mutable -import org.apache.flink.api.common.functions.FlatJoinFunction /** * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s. @@ -540,7 +542,7 @@ class CodeGenerator( } case INTEGER => val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal]) - if (decimal.isValidShort) { + if (decimal.isValidInt) { generateNonNullLiteral(resultType, decimal.intValue().toString) } else { @@ -702,10 +704,19 @@ class CodeGenerator( requireBoolean(operand) generateNot(nullCheck, operand) + // casting case CAST => val operand = operands.head generateCast(nullCheck, operand, resultType) + // advanced scalar functions + case call: SqlOperator => + val callGen = ScalarFunctions.getCallGenerator(call, operands.map(_.resultType)) + callGen + .getOrElse(throw new CodeGenException(s"Unsupported call: $call")) + .generate(this, operands) + + // unknown or invalid case call@_ => throw new CodeGenException(s"Unsupported call: $call") } http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala deleted file mode 100644 index a61bbe4..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala +++ /dev/null @@ -1,462 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, BasicTypeInfo, TypeInformation} -import org.apache.flink.api.table.codegen.CodeGenUtils._ - -object OperatorCodeGen { - - def generateArithmeticOperator( - operator: String, - nullCheck: Boolean, - resultType: TypeInformation[_], - left: GeneratedExpression, - right: GeneratedExpression) - : GeneratedExpression = { - // String arithmetic // TODO rework - if (isString(left)) { - generateOperatorIfNotNull(nullCheck, resultType, left, right) { - (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" - } - } - // Numeric arithmetic - else if (isNumeric(left) && isNumeric(right)) { - val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]] - val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]] - val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) - - generateOperatorIfNotNull(nullCheck, resultType, left, right) { - (leftTerm, rightTerm) => - // no casting required - if (leftType == resultType && rightType == resultType) { - s"$leftTerm $operator $rightTerm" - } - // left needs casting - else if (leftType != resultType && rightType == resultType) { - s"(($resultTypeTerm) $leftTerm) $operator $rightTerm" - } - // right needs casting - else if (leftType == resultType && rightType != resultType) { - s"$leftTerm $operator (($resultTypeTerm) $rightTerm)" - } - // both sides need casting - else { - s"(($resultTypeTerm) $leftTerm) $operator (($resultTypeTerm) $rightTerm)" - } - } - } - else { - throw new CodeGenException("Unsupported arithmetic operation.") - } - } - - def generateUnaryArithmeticOperator( - operator: String, - nullCheck: Boolean, - resultType: TypeInformation[_], - operand: GeneratedExpression) - : GeneratedExpression = { - generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) { - (operandTerm) => s"$operator($operandTerm)" - } - } - - def generateEquals( - nullCheck: Boolean, - left: GeneratedExpression, - right: GeneratedExpression) - : GeneratedExpression = { - generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { - if (isReference(left)) { - (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)" - } - else if (isReference(right)) { - (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)" - } - else { - (leftTerm, rightTerm) => s"$leftTerm == $rightTerm" - } - } - } - - def generateNotEquals( - nullCheck: Boolean, - left: GeneratedExpression, - right: GeneratedExpression) - : GeneratedExpression = { - generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { - if (isReference(left)) { - (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))" - } - else if (isReference(right)) { - (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))" - } - else { - (leftTerm, rightTerm) => s"$leftTerm != $rightTerm" - } - } - } - - def generateComparison( - operator: String, - nullCheck: Boolean, - left: GeneratedExpression, - right: GeneratedExpression) - : GeneratedExpression = { - generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { - if (isString(left) && isString(right)) { - (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0" - } - else if (isNumeric(left) && isNumeric(right)) { - (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" - } - else { - throw new CodeGenException("Comparison is only supported for Strings and numeric types.") - } - } - } - - def generateIsNull( - nullCheck: Boolean, - operand: GeneratedExpression) - : GeneratedExpression = { - val resultTerm = newName("result") - val nullTerm = newName("isNull") - val operatorCode = if (nullCheck) { - s""" - |${operand.code} - |boolean $resultTerm = ${operand.nullTerm}; - |boolean $nullTerm = false; - |""".stripMargin - } - else if (!nullCheck && isReference(operand.resultType)) { - s""" - |${operand.code} - |boolean $resultTerm = ${operand.resultTerm} == null; - |boolean $nullTerm = false; - |""".stripMargin - } - else { - s""" - |${operand.code} - |boolean $resultTerm = false; - |""".stripMargin - } - - GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) - } - - def generateIsNotNull( - nullCheck: Boolean, - operand: GeneratedExpression) - : GeneratedExpression = { - val resultTerm = newName("result") - val nullTerm = newName("isNull") - val operatorCode = if (nullCheck) { - s""" - |${operand.code} - |boolean $resultTerm = !${operand.nullTerm}; - |boolean $nullTerm = false; - |""".stripMargin - } - else if (!nullCheck && isReference(operand.resultType)) { - s""" - |${operand.code} - |boolean $resultTerm = ${operand.resultTerm} != null; - |boolean $nullTerm = false; - |""".stripMargin - } - else { - s""" - |${operand.code} - |boolean $resultTerm = true; - |""".stripMargin - } - - GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) - } - - def generateAnd( - nullCheck: Boolean, - left: GeneratedExpression, - right: GeneratedExpression) - : GeneratedExpression = { - val resultTerm = newName("result") - val nullTerm = newName("isNull") - - val operatorCode = if (nullCheck) { - // Three-valued logic: - // no Unknown -> Two-valued logic - // True && Unknown -> Unknown - // False && Unknown -> False - // Unknown && True -> Unknown - // Unknown && False -> False - // Unknown && Unknown -> Unknown - s""" - |${left.code} - |${right.code} - |boolean $resultTerm; - |boolean $nullTerm; - |if (!${left.nullTerm} && !${right.nullTerm}) { - | $resultTerm = ${left.resultTerm} && ${right.resultTerm}; - | $nullTerm = false; - |} - |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { - | $resultTerm = false; - | $nullTerm = true; - |} - |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { - | $resultTerm = false; - | $nullTerm = false; - |} - |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { - | $resultTerm = false; - | $nullTerm = true; - |} - |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { - | $resultTerm = false; - | $nullTerm = false; - |} - |else { - | $resultTerm = false; - | $nullTerm = true; - |} - |""".stripMargin - } - else { - s""" - |${left.code} - |${right.code} - |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm}; - |""".stripMargin - } - - GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) - } - - def generateOr( - nullCheck: Boolean, - left: GeneratedExpression, - right: GeneratedExpression) - : GeneratedExpression = { - val resultTerm = newName("result") - val nullTerm = newName("isNull") - - val operatorCode = if (nullCheck) { - // Three-valued logic: - // no Unknown -> Two-valued logic - // True && Unknown -> True - // False && Unknown -> Unknown - // Unknown && True -> True - // Unknown && False -> Unknown - // Unknown && Unknown -> Unknown - s""" - |${left.code} - |${right.code} - |boolean $resultTerm; - |boolean $nullTerm; - |if (!${left.nullTerm} && !${right.nullTerm}) { - | $resultTerm = ${left.resultTerm} || ${right.resultTerm}; - | $nullTerm = false; - |} - |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { - | $resultTerm = true; - | $nullTerm = false; - |} - |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { - | $resultTerm = false; - | $nullTerm = true; - |} - |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { - | $resultTerm = true; - | $nullTerm = false; - |} - |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { - | $resultTerm = false; - | $nullTerm = true; - |} - |else { - | $resultTerm = false; - | $nullTerm = true; - |} - |""".stripMargin - } - else { - s""" - |${left.code} - |${right.code} - |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm}; - |""".stripMargin - } - - GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) - } - - def generateNot( - nullCheck: Boolean, - operand: GeneratedExpression) - : GeneratedExpression = { - // Three-valued logic: - // no Unknown -> Two-valued logic - // Unknown -> Unknown - generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) { - (operandTerm) => s"!($operandTerm)" - } - } - - def generateCast( - nullCheck: Boolean, - operand: GeneratedExpression, - targetType: TypeInformation[_]) - : GeneratedExpression = { - targetType match { - // identity casting - case operand.resultType => - generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { - (operandTerm) => s"$operandTerm" - } - - // * -> String - case STRING_TYPE_INFO => - generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { - (operandTerm) => s""" "" + $operandTerm""" - } - - // * -> Date - case DATE_TYPE_INFO => - throw new CodeGenException("Date type not supported yet.") - - // * -> Void - case VOID_TYPE_INFO => - throw new CodeGenException("Void type not supported.") - - // * -> Character - case CHAR_TYPE_INFO => - throw new CodeGenException("Character type not supported.") - - // NUMERIC TYPE -> Boolean - case BOOLEAN_TYPE_INFO if isNumeric(operand) => - generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { - (operandTerm) => s"$operandTerm != 0" - } - - // String -> BASIC TYPE (not String, Date, Void, Character) - case ti: BasicTypeInfo[_] if isString(operand) => - val wrapperClass = targetType.getTypeClass.getCanonicalName - generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { - (operandTerm) => s"$wrapperClass.valueOf($operandTerm)" - } - - // NUMERIC TYPE -> NUMERIC TYPE - case nti: NumericTypeInfo[_] if isNumeric(operand) => - val targetTypeTerm = primitiveTypeTermForTypeInfo(nti) - generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { - (operandTerm) => s"($targetTypeTerm) $operandTerm" - } - - // Boolean -> NUMERIC TYPE - case nti: NumericTypeInfo[_] if isBoolean(operand) => - val targetTypeTerm = primitiveTypeTermForTypeInfo(nti) - generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { - (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)" - } - - case _ => - throw new CodeGenException(s"Unsupported cast from '${operand.resultType}'" + - s"to '$targetType'.") - } - } - - // ---------------------------------------------------------------------------------------------- - - private def generateUnaryOperatorIfNotNull( - nullCheck: Boolean, - resultType: TypeInformation[_], - operand: GeneratedExpression) - (expr: (String) => String) - : GeneratedExpression = { - val resultTerm = newName("result") - val nullTerm = newName("isNull") - val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) - val defaultValue = primitiveDefaultValue(resultType) - - val operatorCode = if (nullCheck) { - s""" - |${operand.code} - |$resultTypeTerm $resultTerm; - |boolean $nullTerm; - |if (!${operand.nullTerm}) { - | $resultTerm = ${expr(operand.resultTerm)}; - | $nullTerm = false; - |} - |else { - | $resultTerm = $defaultValue; - | $nullTerm = true; - |} - |""".stripMargin - } - else { - s""" - |${operand.code} - |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)}; - |""".stripMargin - } - - GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType) - } - - private def generateOperatorIfNotNull( - nullCheck: Boolean, - resultType: TypeInformation[_], - left: GeneratedExpression, - right: GeneratedExpression) - (expr: (String, String) => String) - : GeneratedExpression = { - val resultTerm = newName("result") - val nullTerm = newName("isNull") - val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) - val defaultValue = primitiveDefaultValue(resultType) - - val resultCode = if (nullCheck) { - s""" - |${left.code} - |${right.code} - |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm}; - |$resultTypeTerm $resultTerm; - |if ($nullTerm) { - | $resultTerm = $defaultValue; - |} - |else { - | $resultTerm = ${expr(left.resultTerm, right.resultTerm)}; - |} - |""".stripMargin - } - else { - s""" - |${left.code} - |${right.code} - |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, right.resultTerm)}; - |""".stripMargin - } - - GeneratedExpression(resultTerm, nullTerm, resultCode, resultType) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala new file mode 100644 index 0000000..561dac7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen.calls + +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression} + +trait CallGenerator { + + def generate( + codeGenerator: CodeGenerator, + operands: Seq[GeneratedExpression]) + : GeneratedExpression + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala new file mode 100644 index 0000000..f5ab6bf --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen.calls + +import java.lang.reflect.Method + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.CodeGenUtils._ +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression} + +class MethodCallGenerator(returnType: TypeInformation[_], method: Method) extends CallGenerator { + + override def generate( + codeGenerator: CodeGenerator, + operands: Seq[GeneratedExpression]) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType) + val defaultValue = primitiveDefaultValue(returnType) + + val resultCode = if (codeGenerator.nullCheck) { + s""" + |${operands.map(_.code).mkString("\n")} + |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")}; + |$resultTypeTerm $resultTerm; + |if ($nullTerm) { + | $resultTerm = $defaultValue; + |} + |else { + | $resultTerm = ${method.getDeclaringClass.getCanonicalName}.${method.getName}( + | ${operands.map(_.resultTerm).mkString(", ")}); + |} + |""".stripMargin + } + else { + s""" + |${operands.map(_.code).mkString("\n")} + |$resultTypeTerm $resultTerm = ${method.getDeclaringClass.getCanonicalName}. + | ${method.getName}(${operands.map(_.resultTerm).mkString(", ")}); + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, resultCode, returnType) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala new file mode 100644 index 0000000..8eda314 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen.calls + +import java.lang.reflect.Method + +import org.apache.calcite.sql.SqlOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable._ +import org.apache.calcite.util.BuiltInMethod +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +import scala.collection.mutable + +/** + * Global registry of built-in advanced SQL scalar functions. + */ +object ScalarFunctions { + + private val sqlFunctions: mutable.Map[(SqlOperator, Seq[TypeInformation[_]]), CallGenerator] = + mutable.Map() + + // ---------------------------------------------------------------------------------------------- + addSqlFunctionMethod( + SUBSTRING, + Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO), + STRING_TYPE_INFO, + BuiltInMethod.SUBSTRING.method) + + addSqlFunctionMethod( + SUBSTRING, + Seq(STRING_TYPE_INFO, INT_TYPE_INFO), + STRING_TYPE_INFO, + BuiltInMethod.SUBSTRING.method) + + // ---------------------------------------------------------------------------------------------- + + def getCallGenerator( + call: SqlOperator, + operandTypes: Seq[TypeInformation[_]]) + : Option[CallGenerator] = { + sqlFunctions.get((call, operandTypes)) + } + + // ---------------------------------------------------------------------------------------------- + + private def addSqlFunctionMethod( + sqlOperator: SqlOperator, + operandTypes: Seq[TypeInformation[_]], + returnType: TypeInformation[_], + method: Method) + : Unit = { + sqlFunctions((sqlOperator, operandTypes)) = new MethodCallGenerator(returnType, method) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala new file mode 100644 index 0000000..8580b25 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.codegen.calls + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo, TypeInformation} +import org.apache.flink.api.table.codegen.CodeGenUtils._ +import org.apache.flink.api.table.codegen.{CodeGenException, GeneratedExpression} + +object ScalarOperators { + + def generateArithmeticOperator( + operator: String, + nullCheck: Boolean, + resultType: TypeInformation[_], + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + // String arithmetic // TODO rework + if (isString(left)) { + generateOperatorIfNotNull(nullCheck, resultType, left, right) { + (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" + } + } + // Numeric arithmetic + else if (isNumeric(left) && isNumeric(right)) { + val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]] + val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]] + val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) + + generateOperatorIfNotNull(nullCheck, resultType, left, right) { + (leftTerm, rightTerm) => + // no casting required + if (leftType == resultType && rightType == resultType) { + s"$leftTerm $operator $rightTerm" + } + // left needs casting + else if (leftType != resultType && rightType == resultType) { + s"(($resultTypeTerm) $leftTerm) $operator $rightTerm" + } + // right needs casting + else if (leftType == resultType && rightType != resultType) { + s"$leftTerm $operator (($resultTypeTerm) $rightTerm)" + } + // both sides need casting + else { + s"(($resultTypeTerm) $leftTerm) $operator (($resultTypeTerm) $rightTerm)" + } + } + } + else { + throw new CodeGenException("Unsupported arithmetic operation.") + } + } + + def generateUnaryArithmeticOperator( + operator: String, + nullCheck: Boolean, + resultType: TypeInformation[_], + operand: GeneratedExpression) + : GeneratedExpression = { + generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) { + (operandTerm) => s"$operator($operandTerm)" + } + } + + def generateEquals( + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { + if (isReference(left)) { + (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)" + } + else if (isReference(right)) { + (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)" + } + else { + (leftTerm, rightTerm) => s"$leftTerm == $rightTerm" + } + } + } + + def generateNotEquals( + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { + if (isReference(left)) { + (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))" + } + else if (isReference(right)) { + (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))" + } + else { + (leftTerm, rightTerm) => s"$leftTerm != $rightTerm" + } + } + } + + def generateComparison( + operator: String, + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { + if (isString(left) && isString(right)) { + (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0" + } + else if (isNumeric(left) && isNumeric(right)) { + (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" + } + else { + throw new CodeGenException("Comparison is only supported for Strings and numeric types.") + } + } + } + + def generateIsNull( + nullCheck: Boolean, + operand: GeneratedExpression) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val operatorCode = if (nullCheck) { + s""" + |${operand.code} + |boolean $resultTerm = ${operand.nullTerm}; + |boolean $nullTerm = false; + |""".stripMargin + } + else if (!nullCheck && isReference(operand.resultType)) { + s""" + |${operand.code} + |boolean $resultTerm = ${operand.resultTerm} == null; + |boolean $nullTerm = false; + |""".stripMargin + } + else { + s""" + |${operand.code} + |boolean $resultTerm = false; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) + } + + def generateIsNotNull( + nullCheck: Boolean, + operand: GeneratedExpression) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val operatorCode = if (nullCheck) { + s""" + |${operand.code} + |boolean $resultTerm = !${operand.nullTerm}; + |boolean $nullTerm = false; + |""".stripMargin + } + else if (!nullCheck && isReference(operand.resultType)) { + s""" + |${operand.code} + |boolean $resultTerm = ${operand.resultTerm} != null; + |boolean $nullTerm = false; + |""".stripMargin + } + else { + s""" + |${operand.code} + |boolean $resultTerm = true; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) + } + + def generateAnd( + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + + val operatorCode = if (nullCheck) { + // Three-valued logic: + // no Unknown -> Two-valued logic + // True && Unknown -> Unknown + // False && Unknown -> False + // Unknown && True -> Unknown + // Unknown && False -> False + // Unknown && Unknown -> Unknown + s""" + |${left.code} + |${right.code} + |boolean $resultTerm; + |boolean $nullTerm; + |if (!${left.nullTerm} && !${right.nullTerm}) { + | $resultTerm = ${left.resultTerm} && ${right.resultTerm}; + | $nullTerm = false; + |} + |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = false; + | $nullTerm = true; + |} + |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = false; + | $nullTerm = false; + |} + |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { + | $resultTerm = false; + | $nullTerm = true; + |} + |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { + | $resultTerm = false; + | $nullTerm = false; + |} + |else { + | $resultTerm = false; + | $nullTerm = true; + |} + |""".stripMargin + } + else { + s""" + |${left.code} + |${right.code} + |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm}; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) + } + + def generateOr( + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + + val operatorCode = if (nullCheck) { + // Three-valued logic: + // no Unknown -> Two-valued logic + // True && Unknown -> True + // False && Unknown -> Unknown + // Unknown && True -> True + // Unknown && False -> Unknown + // Unknown && Unknown -> Unknown + s""" + |${left.code} + |${right.code} + |boolean $resultTerm; + |boolean $nullTerm; + |if (!${left.nullTerm} && !${right.nullTerm}) { + | $resultTerm = ${left.resultTerm} || ${right.resultTerm}; + | $nullTerm = false; + |} + |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = true; + | $nullTerm = false; + |} + |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = false; + | $nullTerm = true; + |} + |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { + | $resultTerm = true; + | $nullTerm = false; + |} + |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { + | $resultTerm = false; + | $nullTerm = true; + |} + |else { + | $resultTerm = false; + | $nullTerm = true; + |} + |""".stripMargin + } + else { + s""" + |${left.code} + |${right.code} + |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm}; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) + } + + def generateNot( + nullCheck: Boolean, + operand: GeneratedExpression) + : GeneratedExpression = { + // Three-valued logic: + // no Unknown -> Two-valued logic + // Unknown -> Unknown + generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) { + (operandTerm) => s"!($operandTerm)" + } + } + + def generateCast( + nullCheck: Boolean, + operand: GeneratedExpression, + targetType: TypeInformation[_]) + : GeneratedExpression = { + targetType match { + // identity casting + case operand.resultType => + generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { + (operandTerm) => s"$operandTerm" + } + + // * -> String + case STRING_TYPE_INFO => + generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { + (operandTerm) => s""" "" + $operandTerm""" + } + + // * -> Date + case DATE_TYPE_INFO => + throw new CodeGenException("Date type not supported yet.") + + // * -> Void + case VOID_TYPE_INFO => + throw new CodeGenException("Void type not supported.") + + // * -> Character + case CHAR_TYPE_INFO => + throw new CodeGenException("Character type not supported.") + + // NUMERIC TYPE -> Boolean + case BOOLEAN_TYPE_INFO if isNumeric(operand) => + generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { + (operandTerm) => s"$operandTerm != 0" + } + + // String -> BASIC TYPE (not String, Date, Void, Character) + case ti: BasicTypeInfo[_] if isString(operand) => + val wrapperClass = targetType.getTypeClass.getCanonicalName + generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { + (operandTerm) => s"$wrapperClass.valueOf($operandTerm)" + } + + // NUMERIC TYPE -> NUMERIC TYPE + case nti: NumericTypeInfo[_] if isNumeric(operand) => + val targetTypeTerm = primitiveTypeTermForTypeInfo(nti) + generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { + (operandTerm) => s"($targetTypeTerm) $operandTerm" + } + + // Boolean -> NUMERIC TYPE + case nti: NumericTypeInfo[_] if isBoolean(operand) => + val targetTypeTerm = primitiveTypeTermForTypeInfo(nti) + generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { + (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)" + } + + case _ => + throw new CodeGenException(s"Unsupported cast from '${operand.resultType}'" + + s"to '$targetType'.") + } + } + + // ---------------------------------------------------------------------------------------------- + + private def generateUnaryOperatorIfNotNull( + nullCheck: Boolean, + resultType: TypeInformation[_], + operand: GeneratedExpression) + (expr: (String) => String) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) + val defaultValue = primitiveDefaultValue(resultType) + + val operatorCode = if (nullCheck) { + s""" + |${operand.code} + |$resultTypeTerm $resultTerm; + |boolean $nullTerm; + |if (!${operand.nullTerm}) { + | $resultTerm = ${expr(operand.resultTerm)}; + | $nullTerm = false; + |} + |else { + | $resultTerm = $defaultValue; + | $nullTerm = true; + |} + |""".stripMargin + } + else { + s""" + |${operand.code} + |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)}; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType) + } + + private def generateOperatorIfNotNull( + nullCheck: Boolean, + resultType: TypeInformation[_], + left: GeneratedExpression, + right: GeneratedExpression) + (expr: (String, String) => String) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) + val defaultValue = primitiveDefaultValue(resultType) + + val resultCode = if (nullCheck) { + s""" + |${left.code} + |${right.code} + |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm}; + |$resultTypeTerm $resultTerm; + |if ($nullTerm) { + | $resultTerm = $defaultValue; + |} + |else { + | $resultTerm = ${expr(left.resultTerm, right.resultTerm)}; + |} + |""".stripMargin + } + else { + s""" + |${left.code} + |${right.code} + |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, right.resultTerm)}; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, resultCode, resultType) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala index a39d601..bf551dd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo} case class Substring( str: Expression, beginIndex: Expression, - endIndex: Expression) extends Expression { + endIndex: Option[Expression] = None) extends Expression { def typeInfo = { if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) { throw new ExpressionException( @@ -33,14 +33,23 @@ case class Substring( throw new ExpressionException( s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""") } - if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""") + endIndex match { + case Some(endIdx) if !endIdx.typeInfo.isInstanceOf[IntegerTypeInfo[_]] => + throw new ExpressionException( + s"""End index must be an integer type in $this, is ${endIdx.typeInfo}.""") + case _ => // ok } BasicTypeInfo.STRING_TYPE_INFO } - override def children: Seq[Expression] = Seq(str, beginIndex, endIndex) - override def toString = s"($str).substring($beginIndex, $endIndex)" + override def children: Seq[Expression] = endIndex match { + case Some(endIdx) => Seq(str, beginIndex, endIdx) + case None => Seq(str, beginIndex) + } + + override def toString = endIndex match { + case Some(endIdx) => s"($str).substring($beginIndex, $endIndex)" + case None => s"($str).substring($beginIndex)" + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala index d082c7e..ebbbf8b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala @@ -125,13 +125,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val substring: PackratParser[Expression] = atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ { - case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to) + case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, Some(to)) } lazy val substringWithoutEnd: PackratParser[Expression] = atom ~ ".substring(" ~ expression ~ ")" ^^ { - case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE)) + case e ~ _ ~ from ~ _ => Substring(e, from) } http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala index bad111f..fb9d057 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala @@ -52,17 +52,25 @@ object RexNodeTranslator { val l = extractAggCalls(b.left, relBuilder) val r = extractAggCalls(b.right, relBuilder) (b.makeCopy(List(l._1, r._1)), l._2 ::: r._2) - case s: Substring => + + // Scalar functions + case s@Substring(_, _, Some(endIndex)) => val str = extractAggCalls(s.str, relBuilder) val sta = extractAggCalls(s.beginIndex, relBuilder) - val end = extractAggCalls(s.endIndex, relBuilder) + val end = extractAggCalls(endIndex, relBuilder) (s.makeCopy( - List(str._1, sta._1, end._1)), + List(str._1, sta._1, Some(end._1))), (str._2 ::: sta._2) ::: end._2 ) - case e@_ => + + case s@Substring(_, _, None) => + val str = extractAggCalls(s.str, relBuilder) + val sta = extractAggCalls(s.beginIndex, relBuilder) + (s.makeCopy(List(str._1, sta._1, None)), str._2 ::: sta._2) + + case e@AnyRef => throw new IllegalArgumentException( - s"Expression ${e} of type ${e.getClass()} not supported yet") + s"Expression $e of type ${e.getClass} not supported yet") } } @@ -72,6 +80,7 @@ object RexNodeTranslator { def toRexNode(exp: Expression, relBuilder: RelBuilder): RexNode = { exp match { + // Basic operators case Literal(value, tpe) => relBuilder.literal(value) case ResolvedFieldReference(name, tpe) => @@ -151,16 +160,24 @@ object RexNodeTranslator { case UnaryMinus(child) => val c = toRexNode(child, relBuilder) relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, c) - case Substring(string, start, end) => + + // Scalar functions + case Substring(string, start, Some(end)) => val str = toRexNode(string, relBuilder) val sta = toRexNode(start, relBuilder) val en = toRexNode(end, relBuilder) relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta, en) + + case Substring(string, start, None) => + val str = toRexNode(string, relBuilder) + val sta = toRexNode(start, relBuilder) + relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta) + case a: Aggregation => - throw new IllegalArgumentException(s"Aggregation expression ${a} not allowed at this place") - case e@_ => + throw new IllegalArgumentException(s"Aggregation expression $a not allowed at this place") + case e@AnyRef => throw new IllegalArgumentException( - s"Expression ${e} of type ${e.getClass()} not supported yet") + s"Expression $e of type ${e.getClass} not supported yet") } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java index e1c9e96..628cbef 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java @@ -25,13 +25,11 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.codegen.CodeGenException; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; -import org.apache.flink.api.table.test.TableProgramsTestBase; +import org.apache.flink.api.table.test.utils.TableProgramsTestBase; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java index e009cb8..5b22574 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java @@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.codegen.CodeGenException; -import org.apache.flink.api.table.test.TableProgramsTestBase; +import org.apache.flink.api.table.test.utils.TableProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java index 222f161..8c30163 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.table.test.TableProgramsTestBase; +import org.apache.flink.api.table.test.utils.TableProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java index b8ca4cd..c69d1a7 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.table.test.TableProgramsTestBase; +import org.apache.flink.api.table.test.utils.TableProgramsTestBase; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java index c4ac138..3ce9891 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.table.test.TableProgramsTestBase; +import org.apache.flink.api.table.test.utils.TableProgramsTestBase; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java index 315fe9f..6b8f984 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java @@ -40,7 +40,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { super(mode); } - @Test(expected = CodeGenException.class) + @Test public void testSubstring() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -52,7 +52,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { Table in = tableEnv.fromDataSet(ds, "a, b"); Table result = in - .select("a.substring(0, b)"); + .select("a.substring(1, b)"); DataSet resultSet = tableEnv.toDataSet(result, Row.class); List results = resultSet.collect(); @@ -60,14 +60,14 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = CodeGenException.class) + @Test public void testSubstringWithMaxEnd() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); DataSet> ds = env.fromElements( - new Tuple2<>("ABCD", 2), - new Tuple2<>("ABCD", 1)); + new Tuple2<>("ABCD", 3), + new Tuple2<>("ABCD", 2)); Table in = tableEnv.fromDataSet(ds, "a, b"); http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala index 54580c4..f2120ef 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala @@ -22,8 +22,9 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.Row -import org.apache.flink.api.table.test.TableProgramsTestBase -import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.utils.TableProgramsTestBase import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala index 8ab44a3..ba0311a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala @@ -25,8 +25,9 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.Row import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.test.TableProgramsTestBase -import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.utils.TableProgramsTestBase import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala index 96b9341..c73d7eb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala @@ -23,8 +23,9 @@ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.Row import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.test.TableProgramsTestBase -import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.utils.TableProgramsTestBase import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala index e181e0b..4a78079 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala @@ -22,8 +22,9 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.Row -import org.apache.flink.api.table.test.TableProgramsTestBase -import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.utils.TableProgramsTestBase import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala index f2e9aca..c4fc346 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala @@ -18,35 +18,35 @@ package org.apache.flink.api.scala.table.test -import org.apache.flink.api.table.{Row, ExpressionException} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.api.table.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized + import scala.collection.JavaConverters._ -import org.apache.flink.api.table.codegen.CodeGenException @RunWith(classOf[Parameterized]) class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Test(expected = classOf[CodeGenException]) + @Test def testSubstring(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b) - .select('a.substring(0, 'b)) + .select('a.substring(1, 'b)) val expected = "AA\nB" val results = t.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[CodeGenException]) + @Test def testSubstringWithMaxEnd(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val t = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b) + val t = env.fromElements(("ABCD", 3), ("ABCD", 2)).as('a, 'b) .select('a.substring('b)) val expected = "CD\nBCD" http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala new file mode 100644 index 0000000..9989c1a --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.test + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.parser.ExpressionParser +import org.apache.flink.api.table.test.utils.ExpressionEvaluator +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.junit.Assert.assertEquals +import org.junit.Test + +class ScalarFunctionsTest { + + @Test + def testSubstring(): Unit = { + testFunction( + 'f0.substring(2), + "f0.substring(2)", + "SUBSTRING(f0, 2)", + "his is a test String.") + + testFunction( + 'f0.substring(2, 5), + "f0.substring(2, 5)", + "SUBSTRING(f0, 2, 5)", + "his i") + + testFunction( + 'f0.substring(1, 'f7), + "f0.substring(1, f7)", + "SUBSTRING(f0, 1, f7)", + "Thi") + } + + // ---------------------------------------------------------------------------------------------- + + def testFunction( + expr: Expression, + exprString: String, + sqlExpr: String, + expected: String): Unit = { + val testData = new Row(8) + testData.setField(0, "This is a test String.") + testData.setField(1, true) + testData.setField(2, 42.toByte) + testData.setField(3, 43.toShort) + testData.setField(4, 44.toLong) + testData.setField(5, 4.5.toFloat) + testData.setField(6, 4.6) + testData.setField(7, 3) + + val typeInfo = new RowTypeInfo(Seq( + STRING_TYPE_INFO, + BOOLEAN_TYPE_INFO, + BYTE_TYPE_INFO, + SHORT_TYPE_INFO, + LONG_TYPE_INFO, + FLOAT_TYPE_INFO, + DOUBLE_TYPE_INFO, + INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]] + + val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr) + assertEquals(expected, exprResult) + + val exprStringResult = ExpressionEvaluator.evaluate( + testData, + typeInfo, + ExpressionParser.parseExpression(exprString)) + assertEquals(expected, exprStringResult) + + // TODO test SQL expression + } + + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala deleted file mode 100644 index 0962f4e..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.test - -import java.util - -import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv} -import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv} -import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv} -import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv} -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode.{EFFICIENT, NULL} -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit.runners.Parameterized - -import scala.collection.JavaConversions._ - -class TableProgramsTestBase( - mode: TestExecutionMode, - tableConfigMode: TableConfigMode) - extends MultipleProgramsTestBase(mode) { - - def getJavaTableEnvironment: JavaTableEnv = { - val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv - val tableEnv = new JavaTableEnv - configure(tableEnv.getConfig) - tableEnv - } - - def getScalaTableEnvironment: ScalaTableEnv = { - val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv - val tableEnv = new ScalaTableEnv - configure(tableEnv.getConfig) - tableEnv - } - - def getConfig: TableConfig = { - val config = new TableConfig() - configure(config) - config - } - - def configure(config: TableConfig): Unit = { - tableConfigMode match { - case NULL => - config.setNullCheck(true) - case EFFICIENT => - config.setEfficientTypeUsage(true) - case _ => // keep default - } - } - -} - -object TableProgramsTestBase { - sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean } - object TableConfigMode { - case object DEFAULT extends TableConfigMode { - val nullCheck = false; val efficientTypes = false - } - case object NULL extends TableConfigMode { - val nullCheck = true; val efficientTypes = false - } - case object EFFICIENT extends TableConfigMode { - val nullCheck = false; val efficientTypes = true - } - } - - @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") - def tableConfigs(): util.Collection[Array[java.lang.Object]] = { - Seq( - // TODO more tests in cluster mode? - Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT), - Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT), - Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL), - Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT) - ) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala new file mode 100644 index 0000000..b06ac26 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.test.utils + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.functions.{Function, MapFunction} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction} +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.plan.{RexNodeTranslator, TranslationContext} +import org.apache.flink.api.table.plan.schema.DataSetTable +import org.apache.flink.api.table.runtime.FunctionCompiler +import org.mockito.Mockito._ + +/** + * Utility to translate and evaluate an RexNode or Table API expression to a String. + */ +object ExpressionEvaluator { + + // TestCompiler that uses current class loader + class TestCompiler[T <: Function] extends FunctionCompiler[T] { + def compile(genFunc: GeneratedFunction[T]): Class[T] = + compile(getClass.getClassLoader, genFunc.name, genFunc.code) + } + + private def prepareRelBuilder(typeInfo: TypeInformation[Any]): RelBuilder = { + // create DataSetTable + val dataSetMock = mock(classOf[DataSet[Any]]) + when(dataSetMock.getType).thenReturn(typeInfo) + val tableName = TranslationContext.addDataSet(new DataSetTable[Any]( + dataSetMock, + (0 until typeInfo.getArity).toArray, + (0 until typeInfo.getArity).map("f" + _).toArray)) + + // prepare RelBuilder + val relBuilder = TranslationContext.getRelBuilder + relBuilder.scan(tableName) + relBuilder + } + + def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): String = { + val relBuilder = prepareRelBuilder(typeInfo) + evaluate(data, typeInfo, relBuilder, RexNodeTranslator.toRexNode(expr, relBuilder)) + } + + def evaluate( + data: Any, + typeInfo: TypeInformation[Any], + relBuilder: RelBuilder, + rexNode: RexNode): String = { + // generate code for Mapper + val config = new TableConfig() + val generator = new CodeGenerator(config, typeInfo) + val genExpr = generator.generateExpression(relBuilder.cast(rexNode, VARCHAR)) // cast to String + val bodyCode = + s""" + |${genExpr.code} + |return ${genExpr.resultTerm}; + |""".stripMargin + val genFunc = generator.generateFunction[MapFunction[Any, String]]( + "TestFunction", + classOf[MapFunction[Any, String]], + bodyCode, + STRING_TYPE_INFO.asInstanceOf[TypeInformation[Any]]) + + // compile and evaluate + val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc) + val mapper = clazz.newInstance() + mapper.map(data) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala new file mode 100644 index 0000000..78be519 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.test.utils + +import java.util + +import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv} +import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv} +import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv} +import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv} +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode.{EFFICIENT, NULL} +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit.runners.Parameterized + +import scala.collection.JavaConversions._ + +class TableProgramsTestBase( + mode: TestExecutionMode, + tableConfigMode: TableConfigMode) + extends MultipleProgramsTestBase(mode) { + + def getJavaTableEnvironment: JavaTableEnv = { + val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv + val tableEnv = new JavaTableEnv + configure(tableEnv.getConfig) + tableEnv + } + + def getScalaTableEnvironment: ScalaTableEnv = { + val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv + val tableEnv = new ScalaTableEnv + configure(tableEnv.getConfig) + tableEnv + } + + def getConfig: TableConfig = { + val config = new TableConfig() + configure(config) + config + } + + def configure(config: TableConfig): Unit = { + tableConfigMode match { + case NULL => + config.setNullCheck(true) + case EFFICIENT => + config.setEfficientTypeUsage(true) + case _ => // keep default + } + } + +} + +object TableProgramsTestBase { + sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean } + object TableConfigMode { + case object DEFAULT extends TableConfigMode { + val nullCheck = false; val efficientTypes = false + } + case object NULL extends TableConfigMode { + val nullCheck = true; val efficientTypes = false + } + case object EFFICIENT extends TableConfigMode { + val nullCheck = false; val efficientTypes = true + } + } + + @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") + def tableConfigs(): util.Collection[Array[java.lang.Object]] = { + Seq( + // TODO more tests in cluster mode? + Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT), + Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT), + Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL), + Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT) + ) + } +}