Repository: flink
Updated Branches:
refs/heads/tableOnCalcite 20127ab71 -> 663101325
[FLINK-3226] Translation of scalar function substring()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e220745
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e220745
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e220745
Branch: refs/heads/tableOnCalcite
Commit: 3e22074553cedde1c3a1754d91d26a9f35ac3c2f
Parents: 1c53c87
Author: twalthr <twalthr@apache.org>
Authored: Sat Feb 20 21:41:44 2016 +0100
Committer: twalthr <twalthr@apache.org>
Committed: Thu Feb 25 10:32:41 2016 +0100
----------------------------------------------------------------------
.../flink/api/scala/table/expressionDsl.scala | 8 +-
.../flink/api/table/codegen/CodeGenerator.scala | 19 +-
.../api/table/codegen/OperatorCodeGen.scala | 462 ------------------
.../api/table/codegen/calls/CallGenerator.scala | 30 ++
.../codegen/calls/MethodCallGenerator.scala | 62 +++
.../table/codegen/calls/ScalarFunctions.scala | 72 +++
.../table/codegen/calls/ScalarOperators.scala | 463 +++++++++++++++++++
.../table/expressions/stringExpressions.scala | 21 +-
.../api/table/parser/ExpressionParser.scala | 4 +-
.../api/table/plan/RexNodeTranslator.scala | 35 +-
.../flink/api/java/table/test/AsITCase.java | 4 +-
.../api/java/table/test/CastingITCase.java | 2 +-
.../api/java/table/test/ExpressionsITCase.java | 2 +-
.../flink/api/java/table/test/FilterITCase.java | 2 +-
.../flink/api/java/table/test/SelectITCase.java | 2 +-
.../table/test/StringExpressionsITCase.java | 10 +-
.../flink/api/scala/table/test/AsITCase.scala | 5 +-
.../scala/table/test/ExpressionsITCase.scala | 5 +-
.../api/scala/table/test/FilterITCase.scala | 5 +-
.../api/scala/table/test/SelectITCase.scala | 5 +-
.../table/test/StringExpressionsITCase.scala | 14 +-
.../api/table/test/ScalarFunctionsTest.scala | 96 ++++
.../api/table/test/TableProgramsTestBase.scala | 97 ----
.../table/test/utils/ExpressionEvaluator.scala | 93 ++++
.../test/utils/TableProgramsTestBase.scala | 97 ++++
25 files changed, 1006 insertions(+), 609 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 058ff0e..68914da 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -63,8 +63,12 @@ trait ImplicitExpressionOperations {
def count = Count(expr)
def avg = Avg(expr)
- def substring(beginIndex: Expression, endIndex: Expression = Literal(Int.MaxValue)) = {
- Substring(expr, beginIndex, endIndex)
+ def substring(beginIndex: Expression, endIndex: Expression) = {
+ Substring(expr, beginIndex, Some(endIndex))
+ }
+
+ def substring(beginIndex: Expression) = {
+ Substring(expr, beginIndex)
}
def cast(toType: TypeInformation[_]) = Cast(expr, toType)
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index add5627..a052618 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -19,9 +19,10 @@
package org.apache.flink.api.table.codegen
import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
-import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction}
+import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction, Function, MapFunction}
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
@@ -29,12 +30,13 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.table.TableConfig
import org.apache.flink.api.table.codegen.CodeGenUtils._
import org.apache.flink.api.table.codegen.Indenter.toISC
-import org.apache.flink.api.table.codegen.OperatorCodeGen._
+import org.apache.flink.api.table.codegen.calls.ScalarFunctions
+import org.apache.flink.api.table.codegen.calls.ScalarOperators._
import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
import org.apache.flink.api.table.typeinfo.RowTypeInfo
+
import scala.collection.JavaConversions._
import scala.collection.mutable
-import org.apache.flink.api.common.functions.FlatJoinFunction
/**
* A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s.
@@ -540,7 +542,7 @@ class CodeGenerator(
}
case INTEGER =>
val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
- if (decimal.isValidShort) {
+ if (decimal.isValidInt) {
generateNonNullLiteral(resultType, decimal.intValue().toString)
}
else {
@@ -702,10 +704,19 @@ class CodeGenerator(
requireBoolean(operand)
generateNot(nullCheck, operand)
+ // casting
case CAST =>
val operand = operands.head
generateCast(nullCheck, operand, resultType)
+ // advanced scalar functions
+ case call: SqlOperator =>
+ val callGen = ScalarFunctions.getCallGenerator(call, operands.map(_.resultType))
+ callGen
+ .getOrElse(throw new CodeGenException(s"Unsupported call: $call"))
+ .generate(this, operands)
+
+ // unknown or invalid
case call@_ =>
throw new CodeGenException(s"Unsupported call: $call")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
deleted file mode 100644
index a61bbe4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
+++ /dev/null
@@ -1,462 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-
-object OperatorCodeGen {
-
- def generateArithmeticOperator(
- operator: String,
- nullCheck: Boolean,
- resultType: TypeInformation[_],
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- // String arithmetic // TODO rework
- if (isString(left)) {
- generateOperatorIfNotNull(nullCheck, resultType, left, right) {
- (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
- }
- }
- // Numeric arithmetic
- else if (isNumeric(left) && isNumeric(right)) {
- val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]]
- val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]]
- val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-
- generateOperatorIfNotNull(nullCheck, resultType, left, right) {
- (leftTerm, rightTerm) =>
- // no casting required
- if (leftType == resultType && rightType == resultType) {
- s"$leftTerm $operator $rightTerm"
- }
- // left needs casting
- else if (leftType != resultType && rightType == resultType) {
- s"(($resultTypeTerm) $leftTerm) $operator $rightTerm"
- }
- // right needs casting
- else if (leftType == resultType && rightType != resultType) {
- s"$leftTerm $operator (($resultTypeTerm) $rightTerm)"
- }
- // both sides need casting
- else {
- s"(($resultTypeTerm) $leftTerm) $operator (($resultTypeTerm) $rightTerm)"
- }
- }
- }
- else {
- throw new CodeGenException("Unsupported arithmetic operation.")
- }
- }
-
- def generateUnaryArithmeticOperator(
- operator: String,
- nullCheck: Boolean,
- resultType: TypeInformation[_],
- operand: GeneratedExpression)
- : GeneratedExpression = {
- generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) {
- (operandTerm) => s"$operator($operandTerm)"
- }
- }
-
- def generateEquals(
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
- if (isReference(left)) {
- (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
- }
- else if (isReference(right)) {
- (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
- }
- else {
- (leftTerm, rightTerm) => s"$leftTerm == $rightTerm"
- }
- }
- }
-
- def generateNotEquals(
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
- if (isReference(left)) {
- (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
- }
- else if (isReference(right)) {
- (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
- }
- else {
- (leftTerm, rightTerm) => s"$leftTerm != $rightTerm"
- }
- }
- }
-
- def generateComparison(
- operator: String,
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
- if (isString(left) && isString(right)) {
- (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0"
- }
- else if (isNumeric(left) && isNumeric(right)) {
- (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
- }
- else {
- throw new CodeGenException("Comparison is only supported for Strings and numeric types.")
- }
- }
- }
-
- def generateIsNull(
- nullCheck: Boolean,
- operand: GeneratedExpression)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val operatorCode = if (nullCheck) {
- s"""
- |${operand.code}
- |boolean $resultTerm = ${operand.nullTerm};
- |boolean $nullTerm = false;
- |""".stripMargin
- }
- else if (!nullCheck && isReference(operand.resultType)) {
- s"""
- |${operand.code}
- |boolean $resultTerm = ${operand.resultTerm} == null;
- |boolean $nullTerm = false;
- |""".stripMargin
- }
- else {
- s"""
- |${operand.code}
- |boolean $resultTerm = false;
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
- }
-
- def generateIsNotNull(
- nullCheck: Boolean,
- operand: GeneratedExpression)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val operatorCode = if (nullCheck) {
- s"""
- |${operand.code}
- |boolean $resultTerm = !${operand.nullTerm};
- |boolean $nullTerm = false;
- |""".stripMargin
- }
- else if (!nullCheck && isReference(operand.resultType)) {
- s"""
- |${operand.code}
- |boolean $resultTerm = ${operand.resultTerm} != null;
- |boolean $nullTerm = false;
- |""".stripMargin
- }
- else {
- s"""
- |${operand.code}
- |boolean $resultTerm = true;
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
- }
-
- def generateAnd(
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
-
- val operatorCode = if (nullCheck) {
- // Three-valued logic:
- // no Unknown -> Two-valued logic
- // True && Unknown -> Unknown
- // False && Unknown -> False
- // Unknown && True -> Unknown
- // Unknown && False -> False
- // Unknown && Unknown -> Unknown
- s"""
- |${left.code}
- |${right.code}
- |boolean $resultTerm;
- |boolean $nullTerm;
- |if (!${left.nullTerm} && !${right.nullTerm}) {
- | $resultTerm = ${left.resultTerm} && ${right.resultTerm};
- | $nullTerm = false;
- |}
- |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = false;
- | $nullTerm = false;
- |}
- |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
- | $resultTerm = false;
- | $nullTerm = false;
- |}
- |else {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |""".stripMargin
- }
- else {
- s"""
- |${left.code}
- |${right.code}
- |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
- }
-
- def generateOr(
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
-
- val operatorCode = if (nullCheck) {
- // Three-valued logic:
- // no Unknown -> Two-valued logic
- // True && Unknown -> True
- // False && Unknown -> Unknown
- // Unknown && True -> True
- // Unknown && False -> Unknown
- // Unknown && Unknown -> Unknown
- s"""
- |${left.code}
- |${right.code}
- |boolean $resultTerm;
- |boolean $nullTerm;
- |if (!${left.nullTerm} && !${right.nullTerm}) {
- | $resultTerm = ${left.resultTerm} || ${right.resultTerm};
- | $nullTerm = false;
- |}
- |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = true;
- | $nullTerm = false;
- |}
- |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
- | $resultTerm = true;
- | $nullTerm = false;
- |}
- |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |else {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |""".stripMargin
- }
- else {
- s"""
- |${left.code}
- |${right.code}
- |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
- }
-
- def generateNot(
- nullCheck: Boolean,
- operand: GeneratedExpression)
- : GeneratedExpression = {
- // Three-valued logic:
- // no Unknown -> Two-valued logic
- // Unknown -> Unknown
- generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) {
- (operandTerm) => s"!($operandTerm)"
- }
- }
-
- def generateCast(
- nullCheck: Boolean,
- operand: GeneratedExpression,
- targetType: TypeInformation[_])
- : GeneratedExpression = {
- targetType match {
- // identity casting
- case operand.resultType =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$operandTerm"
- }
-
- // * -> String
- case STRING_TYPE_INFO =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s""" "" + $operandTerm"""
- }
-
- // * -> Date
- case DATE_TYPE_INFO =>
- throw new CodeGenException("Date type not supported yet.")
-
- // * -> Void
- case VOID_TYPE_INFO =>
- throw new CodeGenException("Void type not supported.")
-
- // * -> Character
- case CHAR_TYPE_INFO =>
- throw new CodeGenException("Character type not supported.")
-
- // NUMERIC TYPE -> Boolean
- case BOOLEAN_TYPE_INFO if isNumeric(operand) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$operandTerm != 0"
- }
-
- // String -> BASIC TYPE (not String, Date, Void, Character)
- case ti: BasicTypeInfo[_] if isString(operand) =>
- val wrapperClass = targetType.getTypeClass.getCanonicalName
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$wrapperClass.valueOf($operandTerm)"
- }
-
- // NUMERIC TYPE -> NUMERIC TYPE
- case nti: NumericTypeInfo[_] if isNumeric(operand) =>
- val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"($targetTypeTerm) $operandTerm"
- }
-
- // Boolean -> NUMERIC TYPE
- case nti: NumericTypeInfo[_] if isBoolean(operand) =>
- val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)"
- }
-
- case _ =>
- throw new CodeGenException(s"Unsupported cast from '${operand.resultType}'" +
- s"to '$targetType'.")
- }
- }
-
- // ----------------------------------------------------------------------------------------------
-
- private def generateUnaryOperatorIfNotNull(
- nullCheck: Boolean,
- resultType: TypeInformation[_],
- operand: GeneratedExpression)
- (expr: (String) => String)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
- val defaultValue = primitiveDefaultValue(resultType)
-
- val operatorCode = if (nullCheck) {
- s"""
- |${operand.code}
- |$resultTypeTerm $resultTerm;
- |boolean $nullTerm;
- |if (!${operand.nullTerm}) {
- | $resultTerm = ${expr(operand.resultTerm)};
- | $nullTerm = false;
- |}
- |else {
- | $resultTerm = $defaultValue;
- | $nullTerm = true;
- |}
- |""".stripMargin
- }
- else {
- s"""
- |${operand.code}
- |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)};
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
- }
-
- private def generateOperatorIfNotNull(
- nullCheck: Boolean,
- resultType: TypeInformation[_],
- left: GeneratedExpression,
- right: GeneratedExpression)
- (expr: (String, String) => String)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
- val defaultValue = primitiveDefaultValue(resultType)
-
- val resultCode = if (nullCheck) {
- s"""
- |${left.code}
- |${right.code}
- |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm};
- |$resultTypeTerm $resultTerm;
- |if ($nullTerm) {
- | $resultTerm = $defaultValue;
- |}
- |else {
- | $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
- |}
- |""".stripMargin
- }
- else {
- s"""
- |${left.code}
- |${right.code}
- |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
new file mode 100644
index 0000000..561dac7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
+
+trait CallGenerator {
+
+ def generate(
+ codeGenerator: CodeGenerator,
+ operands: Seq[GeneratedExpression])
+ : GeneratedExpression
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
new file mode 100644
index 0000000..f5ab6bf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import java.lang.reflect.Method
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
+
+class MethodCallGenerator(returnType: TypeInformation[_], method: Method) extends CallGenerator {
+
+ override def generate(
+ codeGenerator: CodeGenerator,
+ operands: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType)
+ val defaultValue = primitiveDefaultValue(returnType)
+
+ val resultCode = if (codeGenerator.nullCheck) {
+ s"""
+ |${operands.map(_.code).mkString("\n")}
+ |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
+ |$resultTypeTerm $resultTerm;
+ |if ($nullTerm) {
+ | $resultTerm = $defaultValue;
+ |}
+ |else {
+ | $resultTerm = ${method.getDeclaringClass.getCanonicalName}.${method.getName}(
+ | ${operands.map(_.resultTerm).mkString(", ")});
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${operands.map(_.code).mkString("\n")}
+ |$resultTypeTerm $resultTerm = ${method.getDeclaringClass.getCanonicalName}.
+ | ${method.getName}(${operands.map(_.resultTerm).mkString(", ")});
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
new file mode 100644
index 0000000..8eda314
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import java.lang.reflect.Method
+
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.collection.mutable
+
+/**
+ * Global registry of built-in advanced SQL scalar functions.
+ */
+object ScalarFunctions {
+
+ private val sqlFunctions: mutable.Map[(SqlOperator, Seq[TypeInformation[_]]), CallGenerator] =
+ mutable.Map()
+
+ // ----------------------------------------------------------------------------------------------
+ addSqlFunctionMethod(
+ SUBSTRING,
+ Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO),
+ STRING_TYPE_INFO,
+ BuiltInMethod.SUBSTRING.method)
+
+ addSqlFunctionMethod(
+ SUBSTRING,
+ Seq(STRING_TYPE_INFO, INT_TYPE_INFO),
+ STRING_TYPE_INFO,
+ BuiltInMethod.SUBSTRING.method)
+
+ // ----------------------------------------------------------------------------------------------
+
+ def getCallGenerator(
+ call: SqlOperator,
+ operandTypes: Seq[TypeInformation[_]])
+ : Option[CallGenerator] = {
+ sqlFunctions.get((call, operandTypes))
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def addSqlFunctionMethod(
+ sqlOperator: SqlOperator,
+ operandTypes: Seq[TypeInformation[_]],
+ returnType: TypeInformation[_],
+ method: Method)
+ : Unit = {
+ sqlFunctions((sqlOperator, operandTypes)) = new MethodCallGenerator(returnType, method)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
new file mode 100644
index 0000000..8580b25
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo, TypeInformation}
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.{CodeGenException, GeneratedExpression}
+
+object ScalarOperators {
+
+ def generateArithmeticOperator(
+ operator: String,
+ nullCheck: Boolean,
+ resultType: TypeInformation[_],
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ // String arithmetic // TODO rework
+ if (isString(left)) {
+ generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+ (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+ }
+ }
+ // Numeric arithmetic
+ else if (isNumeric(left) && isNumeric(right)) {
+ val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]]
+ val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]]
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+
+ generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+ (leftTerm, rightTerm) =>
+ // no casting required
+ if (leftType == resultType && rightType == resultType) {
+ s"$leftTerm $operator $rightTerm"
+ }
+ // left needs casting
+ else if (leftType != resultType && rightType == resultType) {
+ s"(($resultTypeTerm) $leftTerm) $operator $rightTerm"
+ }
+ // right needs casting
+ else if (leftType == resultType && rightType != resultType) {
+ s"$leftTerm $operator (($resultTypeTerm) $rightTerm)"
+ }
+ // both sides need casting
+ else {
+ s"(($resultTypeTerm) $leftTerm) $operator (($resultTypeTerm) $rightTerm)"
+ }
+ }
+ }
+ else {
+ throw new CodeGenException("Unsupported arithmetic operation.")
+ }
+ }
+
+ def generateUnaryArithmeticOperator(
+ operator: String,
+ nullCheck: Boolean,
+ resultType: TypeInformation[_],
+ operand: GeneratedExpression)
+ : GeneratedExpression = {
+ generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) {
+ (operandTerm) => s"$operator($operandTerm)"
+ }
+ }
+
+ def generateEquals(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ if (isReference(left)) {
+ (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
+ }
+ else if (isReference(right)) {
+ (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
+ }
+ else {
+ (leftTerm, rightTerm) => s"$leftTerm == $rightTerm"
+ }
+ }
+ }
+
+ def generateNotEquals(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ if (isReference(left)) {
+ (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
+ }
+ else if (isReference(right)) {
+ (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
+ }
+ else {
+ (leftTerm, rightTerm) => s"$leftTerm != $rightTerm"
+ }
+ }
+ }
+
+ def generateComparison(
+ operator: String,
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ if (isString(left) && isString(right)) {
+ (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0"
+ }
+ else if (isNumeric(left) && isNumeric(right)) {
+ (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+ }
+ else {
+ throw new CodeGenException("Comparison is only supported for Strings and numeric types.")
+ }
+ }
+ }
+
+ def generateIsNull(
+ nullCheck: Boolean,
+ operand: GeneratedExpression)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val operatorCode = if (nullCheck) {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = ${operand.nullTerm};
+ |boolean $nullTerm = false;
+ |""".stripMargin
+ }
+ else if (!nullCheck && isReference(operand.resultType)) {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = ${operand.resultTerm} == null;
+ |boolean $nullTerm = false;
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = false;
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+ }
+
+ def generateIsNotNull(
+ nullCheck: Boolean,
+ operand: GeneratedExpression)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val operatorCode = if (nullCheck) {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = !${operand.nullTerm};
+ |boolean $nullTerm = false;
+ |""".stripMargin
+ }
+ else if (!nullCheck && isReference(operand.resultType)) {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = ${operand.resultTerm} != null;
+ |boolean $nullTerm = false;
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = true;
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+ }
+
+ def generateAnd(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+
+ val operatorCode = if (nullCheck) {
+ // Three-valued logic:
+ // no Unknown -> Two-valued logic
+ // True && Unknown -> Unknown
+ // False && Unknown -> False
+ // Unknown && True -> Unknown
+ // Unknown && False -> False
+ // Unknown && Unknown -> Unknown
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $resultTerm;
+ |boolean $nullTerm;
+ |if (!${left.nullTerm} && !${right.nullTerm}) {
+ | $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+ | $nullTerm = false;
+ |}
+ |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = false;
+ |}
+ |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = false;
+ |}
+ |else {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+ }
+
+ def generateOr(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+
+ val operatorCode = if (nullCheck) {
+ // Three-valued logic:
+ // no Unknown -> Two-valued logic
+ // True && Unknown -> True
+ // False && Unknown -> Unknown
+ // Unknown && True -> True
+ // Unknown && False -> Unknown
+ // Unknown && Unknown -> Unknown
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $resultTerm;
+ |boolean $nullTerm;
+ |if (!${left.nullTerm} && !${right.nullTerm}) {
+ | $resultTerm = ${left.resultTerm} || ${right.resultTerm};
+ | $nullTerm = false;
+ |}
+ |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
+ | $resultTerm = true;
+ | $nullTerm = false;
+ |}
+ |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
+ | $resultTerm = true;
+ | $nullTerm = false;
+ |}
+ |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |else {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+ }
+
+ def generateNot(
+ nullCheck: Boolean,
+ operand: GeneratedExpression)
+ : GeneratedExpression = {
+ // Three-valued logic:
+ // no Unknown -> Two-valued logic
+ // Unknown -> Unknown
+ generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) {
+ (operandTerm) => s"!($operandTerm)"
+ }
+ }
+
+ def generateCast(
+ nullCheck: Boolean,
+ operand: GeneratedExpression,
+ targetType: TypeInformation[_])
+ : GeneratedExpression = {
+ targetType match {
+ // identity casting
+ case operand.resultType =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$operandTerm"
+ }
+
+ // * -> String
+ case STRING_TYPE_INFO =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s""" "" + $operandTerm"""
+ }
+
+ // * -> Date
+ case DATE_TYPE_INFO =>
+ throw new CodeGenException("Date type not supported yet.")
+
+ // * -> Void
+ case VOID_TYPE_INFO =>
+ throw new CodeGenException("Void type not supported.")
+
+ // * -> Character
+ case CHAR_TYPE_INFO =>
+ throw new CodeGenException("Character type not supported.")
+
+ // NUMERIC TYPE -> Boolean
+ case BOOLEAN_TYPE_INFO if isNumeric(operand) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$operandTerm != 0"
+ }
+
+ // String -> BASIC TYPE (not String, Date, Void, Character)
+ case ti: BasicTypeInfo[_] if isString(operand) =>
+ val wrapperClass = targetType.getTypeClass.getCanonicalName
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$wrapperClass.valueOf($operandTerm)"
+ }
+
+ // NUMERIC TYPE -> NUMERIC TYPE
+ case nti: NumericTypeInfo[_] if isNumeric(operand) =>
+ val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"($targetTypeTerm) $operandTerm"
+ }
+
+ // Boolean -> NUMERIC TYPE
+ case nti: NumericTypeInfo[_] if isBoolean(operand) =>
+ val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)"
+ }
+
+ case _ =>
+ throw new CodeGenException(s"Unsupported cast from '${operand.resultType}'" +
+ s"to '$targetType'.")
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def generateUnaryOperatorIfNotNull(
+ nullCheck: Boolean,
+ resultType: TypeInformation[_],
+ operand: GeneratedExpression)
+ (expr: (String) => String)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+ val defaultValue = primitiveDefaultValue(resultType)
+
+ val operatorCode = if (nullCheck) {
+ s"""
+ |${operand.code}
+ |$resultTypeTerm $resultTerm;
+ |boolean $nullTerm;
+ |if (!${operand.nullTerm}) {
+ | $resultTerm = ${expr(operand.resultTerm)};
+ | $nullTerm = false;
+ |}
+ |else {
+ | $resultTerm = $defaultValue;
+ | $nullTerm = true;
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${operand.code}
+ |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)};
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
+ }
+
+ private def generateOperatorIfNotNull(
+ nullCheck: Boolean,
+ resultType: TypeInformation[_],
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ (expr: (String, String) => String)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+ val defaultValue = primitiveDefaultValue(resultType)
+
+ val resultCode = if (nullCheck) {
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm};
+ |$resultTypeTerm $resultTerm;
+ |if ($nullTerm) {
+ | $resultTerm = $defaultValue;
+ |}
+ |else {
+ | $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${left.code}
+ |${right.code}
+ |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
index a39d601..bf551dd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
case class Substring(
str: Expression,
beginIndex: Expression,
- endIndex: Expression) extends Expression {
+ endIndex: Option[Expression] = None) extends Expression {
def typeInfo = {
if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
throw new ExpressionException(
@@ -33,14 +33,23 @@ case class Substring(
throw new ExpressionException(
s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""")
}
- if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""")
+ endIndex match {
+ case Some(endIdx) if !endIdx.typeInfo.isInstanceOf[IntegerTypeInfo[_]] =>
+ throw new ExpressionException(
+ s"""End index must be an integer type in $this, is ${endIdx.typeInfo}.""")
+ case _ => // ok
}
BasicTypeInfo.STRING_TYPE_INFO
}
- override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
- override def toString = s"($str).substring($beginIndex, $endIndex)"
+ override def children: Seq[Expression] = endIndex match {
+ case Some(endIdx) => Seq(str, beginIndex, endIdx)
+ case None => Seq(str, beginIndex)
+ }
+
+ override def toString = endIndex match {
+ case Some(endIdx) => s"($str).substring($beginIndex, $endIndex)"
+ case None => s"($str).substring($beginIndex)"
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index d082c7e..ebbbf8b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -125,13 +125,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val substring: PackratParser[Expression] =
atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ {
- case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to)
+ case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, Some(to))
}
lazy val substringWithoutEnd: PackratParser[Expression] =
atom ~ ".substring(" ~ expression ~ ")" ^^ {
- case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE))
+ case e ~ _ ~ from ~ _ => Substring(e, from)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index bad111f..fb9d057 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -52,17 +52,25 @@ object RexNodeTranslator {
val l = extractAggCalls(b.left, relBuilder)
val r = extractAggCalls(b.right, relBuilder)
(b.makeCopy(List(l._1, r._1)), l._2 ::: r._2)
- case s: Substring =>
+
+ // Scalar functions
+ case s@Substring(_, _, Some(endIndex)) =>
val str = extractAggCalls(s.str, relBuilder)
val sta = extractAggCalls(s.beginIndex, relBuilder)
- val end = extractAggCalls(s.endIndex, relBuilder)
+ val end = extractAggCalls(endIndex, relBuilder)
(s.makeCopy(
- List(str._1, sta._1, end._1)),
+ List(str._1, sta._1, Some(end._1))),
(str._2 ::: sta._2) ::: end._2
)
- case e@_ =>
+
+ case s@Substring(_, _, None) =>
+ val str = extractAggCalls(s.str, relBuilder)
+ val sta = extractAggCalls(s.beginIndex, relBuilder)
+ (s.makeCopy(List(str._1, sta._1, None)), str._2 ::: sta._2)
+
+ case e@AnyRef =>
throw new IllegalArgumentException(
- s"Expression ${e} of type ${e.getClass()} not supported yet")
+ s"Expression $e of type ${e.getClass} not supported yet")
}
}
@@ -72,6 +80,7 @@ object RexNodeTranslator {
def toRexNode(exp: Expression, relBuilder: RelBuilder): RexNode = {
exp match {
+ // Basic operators
case Literal(value, tpe) =>
relBuilder.literal(value)
case ResolvedFieldReference(name, tpe) =>
@@ -151,16 +160,24 @@ object RexNodeTranslator {
case UnaryMinus(child) =>
val c = toRexNode(child, relBuilder)
relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, c)
- case Substring(string, start, end) =>
+
+ // Scalar functions
+ case Substring(string, start, Some(end)) =>
val str = toRexNode(string, relBuilder)
val sta = toRexNode(start, relBuilder)
val en = toRexNode(end, relBuilder)
relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta, en)
+
+ case Substring(string, start, None) =>
+ val str = toRexNode(string, relBuilder)
+ val sta = toRexNode(start, relBuilder)
+ relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta)
+
case a: Aggregation =>
- throw new IllegalArgumentException(s"Aggregation expression ${a} not allowed at this place")
- case e@_ =>
+ throw new IllegalArgumentException(s"Aggregation expression $a not allowed at this place")
+ case e@AnyRef =>
throw new IllegalArgumentException(
- s"Expression ${e} of type ${e.getClass()} not supported yet")
+ s"Expression $e of type ${e.getClass} not supported yet")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index e1c9e96..628cbef 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -25,13 +25,11 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.table.Table;
import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.codegen.CodeGenException;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index e009cb8..5b22574 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.Table;
import org.apache.flink.api.table.codegen.CodeGenException;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
index 222f161..8c30163 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index b8ca4cd..c69d1a7 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.junit.Test;
import org.junit.runner.RunWith;
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index c4ac138..3ce9891 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.junit.Test;
import org.junit.runner.RunWith;
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 315fe9f..6b8f984 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -40,7 +40,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
super(mode);
}
- @Test(expected = CodeGenException.class)
+ @Test
public void testSubstring() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
@@ -52,7 +52,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
Table in = tableEnv.fromDataSet(ds, "a, b");
Table result = in
- .select("a.substring(0, b)");
+ .select("a.substring(1, b)");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
@@ -60,14 +60,14 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
compareResultAsText(results, expected);
}
- @Test(expected = CodeGenException.class)
+ @Test
public void testSubstringWithMaxEnd() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
DataSet<Tuple2<String, Integer>> ds = env.fromElements(
- new Tuple2<>("ABCD", 2),
- new Tuple2<>("ABCD", 1));
+ new Tuple2<>("ABCD", 3),
+ new Tuple2<>("ABCD", 2));
Table in = tableEnv.fromDataSet(ds, "a, b");
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 54580c4..f2120ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -22,8 +22,9 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.test.TableProgramsTestBase
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 8ab44a3..ba0311a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -25,8 +25,9 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.test.TableProgramsTestBase
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 96b9341..c73d7eb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -23,8 +23,9 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.test.TableProgramsTestBase
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index e181e0b..4a78079 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -22,8 +22,9 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.test.TableProgramsTestBase
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index f2e9aca..c4fc346 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -18,35 +18,35 @@
package org.apache.flink.api.scala.table.test
-import org.apache.flink.api.table.{Row, ExpressionException}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.api.table.Row
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
+
import scala.collection.JavaConverters._
-import org.apache.flink.api.table.codegen.CodeGenException
@RunWith(classOf[Parameterized])
class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- @Test(expected = classOf[CodeGenException])
+ @Test
def testSubstring(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
- .select('a.substring(0, 'b))
+ .select('a.substring(1, 'b))
val expected = "AA\nB"
val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- @Test(expected = classOf[CodeGenException])
+ @Test
def testSubstringWithMaxEnd(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val t = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
+ val t = env.fromElements(("ABCD", 3), ("ABCD", 2)).as('a, 'b)
.select('a.substring('b))
val expected = "CD\nBCD"
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
new file mode 100644
index 0000000..9989c1a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.test
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.test.utils.ExpressionEvaluator
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ScalarFunctionsTest {
+
+ @Test
+ def testSubstring(): Unit = {
+ testFunction(
+ 'f0.substring(2),
+ "f0.substring(2)",
+ "SUBSTRING(f0, 2)",
+ "his is a test String.")
+
+ testFunction(
+ 'f0.substring(2, 5),
+ "f0.substring(2, 5)",
+ "SUBSTRING(f0, 2, 5)",
+ "his i")
+
+ testFunction(
+ 'f0.substring(1, 'f7),
+ "f0.substring(1, f7)",
+ "SUBSTRING(f0, 1, f7)",
+ "Thi")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def testFunction(
+ expr: Expression,
+ exprString: String,
+ sqlExpr: String,
+ expected: String): Unit = {
+ val testData = new Row(8)
+ testData.setField(0, "This is a test String.")
+ testData.setField(1, true)
+ testData.setField(2, 42.toByte)
+ testData.setField(3, 43.toShort)
+ testData.setField(4, 44.toLong)
+ testData.setField(5, 4.5.toFloat)
+ testData.setField(6, 4.6)
+ testData.setField(7, 3)
+
+ val typeInfo = new RowTypeInfo(Seq(
+ STRING_TYPE_INFO,
+ BOOLEAN_TYPE_INFO,
+ BYTE_TYPE_INFO,
+ SHORT_TYPE_INFO,
+ LONG_TYPE_INFO,
+ FLOAT_TYPE_INFO,
+ DOUBLE_TYPE_INFO,
+ INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
+
+ val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr)
+ assertEquals(expected, exprResult)
+
+ val exprStringResult = ExpressionEvaluator.evaluate(
+ testData,
+ typeInfo,
+ ExpressionParser.parseExpression(exprString))
+ assertEquals(expected, exprStringResult)
+
+ // TODO test SQL expression
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
deleted file mode 100644
index 0962f4e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.test
-
-import java.util
-
-import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv}
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
-import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv}
-import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv}
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode.{EFFICIENT, NULL}
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
-
-class TableProgramsTestBase(
- mode: TestExecutionMode,
- tableConfigMode: TableConfigMode)
- extends MultipleProgramsTestBase(mode) {
-
- def getJavaTableEnvironment: JavaTableEnv = {
- val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv
- val tableEnv = new JavaTableEnv
- configure(tableEnv.getConfig)
- tableEnv
- }
-
- def getScalaTableEnvironment: ScalaTableEnv = {
- val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv
- val tableEnv = new ScalaTableEnv
- configure(tableEnv.getConfig)
- tableEnv
- }
-
- def getConfig: TableConfig = {
- val config = new TableConfig()
- configure(config)
- config
- }
-
- def configure(config: TableConfig): Unit = {
- tableConfigMode match {
- case NULL =>
- config.setNullCheck(true)
- case EFFICIENT =>
- config.setEfficientTypeUsage(true)
- case _ => // keep default
- }
- }
-
-}
-
-object TableProgramsTestBase {
- sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean }
- object TableConfigMode {
- case object DEFAULT extends TableConfigMode {
- val nullCheck = false; val efficientTypes = false
- }
- case object NULL extends TableConfigMode {
- val nullCheck = true; val efficientTypes = false
- }
- case object EFFICIENT extends TableConfigMode {
- val nullCheck = false; val efficientTypes = true
- }
- }
-
- @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
- def tableConfigs(): util.Collection[Array[java.lang.Object]] = {
- Seq(
- // TODO more tests in cluster mode?
- Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT),
- Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT),
- Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL),
- Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT)
- )
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
new file mode 100644
index 0000000..b06ac26
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.test.utils
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.functions.{Function, MapFunction}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.plan.{RexNodeTranslator, TranslationContext}
+import org.apache.flink.api.table.plan.schema.DataSetTable
+import org.apache.flink.api.table.runtime.FunctionCompiler
+import org.mockito.Mockito._
+
+/**
+ * Utility to translate and evaluate an RexNode or Table API expression to a String.
+ */
+object ExpressionEvaluator {
+
+ // TestCompiler that uses current class loader
+ class TestCompiler[T <: Function] extends FunctionCompiler[T] {
+ def compile(genFunc: GeneratedFunction[T]): Class[T] =
+ compile(getClass.getClassLoader, genFunc.name, genFunc.code)
+ }
+
+ private def prepareRelBuilder(typeInfo: TypeInformation[Any]): RelBuilder = {
+ // create DataSetTable
+ val dataSetMock = mock(classOf[DataSet[Any]])
+ when(dataSetMock.getType).thenReturn(typeInfo)
+ val tableName = TranslationContext.addDataSet(new DataSetTable[Any](
+ dataSetMock,
+ (0 until typeInfo.getArity).toArray,
+ (0 until typeInfo.getArity).map("f" + _).toArray))
+
+ // prepare RelBuilder
+ val relBuilder = TranslationContext.getRelBuilder
+ relBuilder.scan(tableName)
+ relBuilder
+ }
+
+ def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): String = {
+ val relBuilder = prepareRelBuilder(typeInfo)
+ evaluate(data, typeInfo, relBuilder, RexNodeTranslator.toRexNode(expr, relBuilder))
+ }
+
+ def evaluate(
+ data: Any,
+ typeInfo: TypeInformation[Any],
+ relBuilder: RelBuilder,
+ rexNode: RexNode): String = {
+ // generate code for Mapper
+ val config = new TableConfig()
+ val generator = new CodeGenerator(config, typeInfo)
+ val genExpr = generator.generateExpression(relBuilder.cast(rexNode, VARCHAR)) // cast to String
+ val bodyCode =
+ s"""
+ |${genExpr.code}
+ |return ${genExpr.resultTerm};
+ |""".stripMargin
+ val genFunc = generator.generateFunction[MapFunction[Any, String]](
+ "TestFunction",
+ classOf[MapFunction[Any, String]],
+ bodyCode,
+ STRING_TYPE_INFO.asInstanceOf[TypeInformation[Any]])
+
+ // compile and evaluate
+ val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc)
+ val mapper = clazz.newInstance()
+ mapper.map(data)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
new file mode 100644
index 0000000..78be519
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.test.utils
+
+import java.util
+
+import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
+import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv}
+import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv}
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode.{EFFICIENT, NULL}
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
+
+class TableProgramsTestBase(
+ mode: TestExecutionMode,
+ tableConfigMode: TableConfigMode)
+ extends MultipleProgramsTestBase(mode) {
+
+ def getJavaTableEnvironment: JavaTableEnv = {
+ val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv
+ val tableEnv = new JavaTableEnv
+ configure(tableEnv.getConfig)
+ tableEnv
+ }
+
+ def getScalaTableEnvironment: ScalaTableEnv = {
+ val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv
+ val tableEnv = new ScalaTableEnv
+ configure(tableEnv.getConfig)
+ tableEnv
+ }
+
+ def getConfig: TableConfig = {
+ val config = new TableConfig()
+ configure(config)
+ config
+ }
+
+ def configure(config: TableConfig): Unit = {
+ tableConfigMode match {
+ case NULL =>
+ config.setNullCheck(true)
+ case EFFICIENT =>
+ config.setEfficientTypeUsage(true)
+ case _ => // keep default
+ }
+ }
+
+}
+
+object TableProgramsTestBase {
+ sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean }
+ object TableConfigMode {
+ case object DEFAULT extends TableConfigMode {
+ val nullCheck = false; val efficientTypes = false
+ }
+ case object NULL extends TableConfigMode {
+ val nullCheck = true; val efficientTypes = false
+ }
+ case object EFFICIENT extends TableConfigMode {
+ val nullCheck = false; val efficientTypes = true
+ }
+ }
+
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ def tableConfigs(): util.Collection[Array[java.lang.Object]] = {
+ Seq(
+ // TODO more tests in cluster mode?
+ Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT),
+ Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT),
+ Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL),
+ Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT)
+ )
+ }
+}
|