flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-3573] [table] Implement more String functions for Table API
Date Tue, 08 Mar 2016 13:07:49 GMT
Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite 2fcdad96a -> e9b9b6f3a


[FLINK-3573] [table] Implement more String functions for Table API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9b9b6f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9b9b6f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9b9b6f3

Branch: refs/heads/tableOnCalcite
Commit: e9b9b6f3a61ac18b92b183408abf85aa243ccb54
Parents: 2fcdad9
Author: twalthr <twalthr@apache.org>
Authored: Fri Mar 4 11:37:58 2016 +0100
Committer: twalthr <twalthr@apache.org>
Committed: Tue Mar 8 14:06:37 2016 +0100

----------------------------------------------------------------------
 .../flink/api/scala/table/expressionDsl.scala   |  49 ++++++++
 .../flink/api/table/codegen/CodeGenerator.scala |   7 ++
 .../api/table/codegen/calls/CallGenerator.scala |  39 +++++++
 .../codegen/calls/MethodCallGenerator.scala     |  35 ++----
 .../table/codegen/calls/NotCallGenerator.scala  |  37 ++++++
 .../table/codegen/calls/ScalarFunctions.scala   |  64 +++++++++++
 .../table/codegen/calls/TrimCallGenerator.scala |  49 ++------
 .../flink/api/table/expressions/call.scala      |   6 +
 .../api/table/plan/RexNodeTranslator.scala      |   6 +
 .../api/table/test/ScalarFunctionsTest.scala    | 115 +++++++++++++++++++
 10 files changed, 342 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index a3484a1..0f1fb67 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -124,6 +124,55 @@ trait ImplicitExpressionOperations {
       expr
     }
   }
+
+  /**
+    * Returns the length of a String.
+    */
+  def charLength() = {
+    Call(BuiltInFunctionNames.CHAR_LENGTH, expr)
+  }
+
+  /**
+    * Returns all of the characters in a String in upper case using the rules of
+    * the default locale.
+    */
+  def upperCase() = {
+    Call(BuiltInFunctionNames.UPPER_CASE, expr)
+  }
+
+  /**
+    * Returns all of the characters in a String in lower case using the rules of
+    * the default locale.
+    */
+  def lowerCase() = {
+    Call(BuiltInFunctionNames.LOWER_CASE, expr)
+  }
+
+  /**
+    * Converts the initial letter of each word in a String to uppercase.
+    * Assumes a String containing only [A-Za-z0-9], everything else is treated as whitespace.
+    */
+  def initCap() = {
+    Call(BuiltInFunctionNames.INIT_CAP, expr)
+  }
+
+  /**
+    * Returns true, if a String matches the specified LIKE pattern.
+    *
+    * e.g. "Jo_n%" matches all Strings that start with "Jo(arbitrary letter)n"
+    */
+  def like(pattern: Expression) = {
+    Call(BuiltInFunctionNames.LIKE, expr, pattern)
+  }
+
+  /**
+    * Returns true, if a String matches the specified SQL regex pattern.
+    *
+    * e.g. "A+" matches all Strings that consist of at least one A
+    */
+  def similar(pattern: Expression) = {
+    Call(BuiltInFunctionNames.SIMILAR, expr, pattern)
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 64f98e8..5ebd4c3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -712,6 +712,13 @@ class CodeGenerator(
         val operand = operands.head
         generateCast(nullCheck, operand, resultType)
 
+      // string arithmetic
+      case CONCAT =>
+        val left = operands.head
+        val right = operands(1)
+        requireString(left)
+        generateArithmeticOperator("+", nullCheck, resultType, left, right)
+
       // advanced scalar functions
       case call: SqlOperator =>
         val callGen = ScalarFunctions.getCallGenerator(call, operands.map(_.resultType))

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
index 561dac7..544216a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.table.codegen.calls
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.CodeGenUtils._
 import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
 
 trait CallGenerator {
@@ -28,3 +30,40 @@ trait CallGenerator {
     : GeneratedExpression
 
 }
+
+object CallGenerator {
+
+  def generateCallIfArgsNotNull(
+      nullCheck: Boolean,
+      returnType: TypeInformation[_],
+      operands: Seq[GeneratedExpression])
+      (call: (Seq[String]) => String)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType)
+    val defaultValue = primitiveDefaultValue(returnType)
+
+    val resultCode = if (nullCheck) {
+      s"""
+        |${operands.map(_.code).mkString("\n")}
+        |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
+        |$resultTypeTerm $resultTerm;
+        |if ($nullTerm) {
+        |  $resultTerm = $defaultValue;
+        |}
+        |else {
+        |  $resultTerm = ${call(operands.map(_.resultTerm))};
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operands.map(_.code).mkString("\n")}
+        |$resultTypeTerm $resultTerm = ${call(operands.map(_.resultTerm))};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
index f5ab6bf..72e922d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.codegen.calls
 import java.lang.reflect.Method
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
 import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
 
 class MethodCallGenerator(returnType: TypeInformation[_], method: Method) extends CallGenerator
{
@@ -30,33 +30,12 @@ class MethodCallGenerator(returnType: TypeInformation[_], method: Method)
extend
       codeGenerator: CodeGenerator,
       operands: Seq[GeneratedExpression])
     : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType)
-    val defaultValue = primitiveDefaultValue(returnType)
-
-    val resultCode = if (codeGenerator.nullCheck) {
-      s"""
-        |${operands.map(_.code).mkString("\n")}
-        |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
-        |$resultTypeTerm $resultTerm;
-        |if ($nullTerm) {
-        |  $resultTerm = $defaultValue;
-        |}
-        |else {
-        |  $resultTerm = ${method.getDeclaringClass.getCanonicalName}.${method.getName}(
-        |    ${operands.map(_.resultTerm).mkString(", ")});
-        |}
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${operands.map(_.code).mkString("\n")}
-        |$resultTypeTerm $resultTerm = ${method.getDeclaringClass.getCanonicalName}.
-        |  ${method.getName}(${operands.map(_.resultTerm).mkString(", ")});
-        |""".stripMargin
+    generateCallIfArgsNotNull(codeGenerator.nullCheck, returnType, operands) {
+      (operandResultTerms) =>
+        s"""
+          |${method.getDeclaringClass.getCanonicalName}.
+          |  ${method.getName}(${operandResultTerms.mkString(", ")})
+         """.stripMargin
     }
-
-    GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
new file mode 100644
index 0000000..5cd358a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import org.apache.flink.api.table.codegen.calls.ScalarOperators.generateNot
+import org.apache.flink.api.table.codegen.{GeneratedExpression, CodeGenerator}
+
+/**
+  * Inverts the boolean value of a CallGenerator result.
+  */
+class NotCallGenerator(callGenerator: CallGenerator) extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    val expr = callGenerator.generate(codeGenerator, operands)
+    generateNot(codeGenerator.nullCheck, expr)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
index ec0d00b..fb807a3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
@@ -37,6 +37,9 @@ object ScalarFunctions {
     mutable.Map()
 
   // ----------------------------------------------------------------------------------------------
+  // String functions
+  // ----------------------------------------------------------------------------------------------
+
   addSqlFunctionMethod(
     SUBSTRING,
     Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO),
@@ -54,6 +57,58 @@ object ScalarFunctions {
     Seq(INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
     STRING_TYPE_INFO)
 
+  addSqlFunctionMethod(
+    CHAR_LENGTH,
+    Seq(STRING_TYPE_INFO),
+    INT_TYPE_INFO,
+    BuiltInMethod.CHAR_LENGTH.method)
+
+  addSqlFunctionMethod(
+    CHARACTER_LENGTH,
+    Seq(STRING_TYPE_INFO),
+    INT_TYPE_INFO,
+    BuiltInMethod.CHAR_LENGTH.method)
+
+  addSqlFunctionMethod(
+    UPPER,
+    Seq(STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethod.UPPER.method)
+
+  addSqlFunctionMethod(
+    LOWER,
+    Seq(STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethod.LOWER.method)
+
+  addSqlFunctionMethod(
+    INITCAP,
+    Seq(STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethod.INITCAP.method)
+
+  addSqlFunctionMethod(
+    LIKE,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
+    BOOLEAN_TYPE_INFO,
+    BuiltInMethod.LIKE.method)
+
+  addSqlFunctionNotMethod(
+    NOT_LIKE,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
+    BuiltInMethod.LIKE.method)
+
+  addSqlFunctionMethod(
+    SIMILAR_TO,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
+    BOOLEAN_TYPE_INFO,
+    BuiltInMethod.SIMILAR.method)
+
+  addSqlFunctionNotMethod(
+    NOT_SIMILAR_TO,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
+    BuiltInMethod.SIMILAR.method)
+
   // ----------------------------------------------------------------------------------------------
 
   def getCallGenerator(
@@ -74,6 +129,15 @@ object ScalarFunctions {
     sqlFunctions((sqlOperator, operandTypes)) = new MethodCallGenerator(returnType, method)
   }
 
+  private def addSqlFunctionNotMethod(
+      sqlOperator: SqlOperator,
+      operandTypes: Seq[TypeInformation[_]],
+      method: Method)
+    : Unit = {
+    sqlFunctions((sqlOperator, operandTypes)) =
+      new NotCallGenerator(new MethodCallGenerator(BOOLEAN_TYPE_INFO, method))
+  }
+
   private def addSqlFunctionTrim(
       sqlOperator: SqlOperator,
       operandTypes: Seq[TypeInformation[_]],

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala
index dcf6107..72e60c2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.codegen.calls
 import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING}
 import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.calls.CallGenerator._
 import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
 
 /**
@@ -36,43 +36,18 @@ class TrimCallGenerator(returnType: TypeInformation[_]) extends CallGenerator
{
       operands: Seq[GeneratedExpression])
     : GeneratedExpression = {
     val method = BuiltInMethod.TRIM.method
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType)
-    val defaultValue = primitiveDefaultValue(returnType)
-
-    val methodCall =
-      s"""
-        |${method.getDeclaringClass.getCanonicalName}.${method.getName}(
-        |  ${operands.head.resultTerm} == ${BOTH.ordinal()} ||
-        |    ${operands.head.resultTerm} == ${LEADING.ordinal()},
-        |  ${operands.head.resultTerm} == ${BOTH.ordinal()} ||
-        |    ${operands.head.resultTerm} == ${TRAILING.ordinal()},
-        |  ${operands(1).resultTerm},
-        |  ${operands(2).resultTerm})
-        |""".stripMargin
-
-    val resultCode = if (codeGenerator.nullCheck) {
-      s"""
-        |${operands.map(_.code).mkString("\n")}
-        |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
-        |$resultTypeTerm $resultTerm;
-        |if ($nullTerm) {
-        |  $resultTerm = $defaultValue;
-        |}
-        |else {
-        |  $resultTerm = $methodCall;
-        |}
-        |""".stripMargin
+    generateCallIfArgsNotNull(codeGenerator.nullCheck, returnType, operands) {
+      (operandResultTerms) =>
+        s"""
+          |${method.getDeclaringClass.getCanonicalName}.${method.getName}(
+          |  ${operandResultTerms.head} == ${BOTH.ordinal()} ||
+          |    ${operandResultTerms.head} == ${LEADING.ordinal()},
+          |  ${operandResultTerms.head} == ${BOTH.ordinal()} ||
+          |    ${operandResultTerms.head} == ${TRAILING.ordinal()},
+          |  ${operandResultTerms(1)},
+          |  ${operandResultTerms(2)})
+          |""".stripMargin
     }
-    else {
-      s"""
-        |${operands.map(_.code).mkString("\n")}
-        |$resultTypeTerm $resultTerm = $methodCall;
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
index 6d26946..7f46435 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
@@ -44,6 +44,12 @@ case class Call(functionName: String, args: Expression*) extends Expression
{
 object BuiltInFunctionNames {
   val SUBSTRING = "SUBSTRING"
   val TRIM = "TRIM"
+  val CHAR_LENGTH = "CHARLENGTH"
+  val UPPER_CASE = "UPPERCASE"
+  val LOWER_CASE = "LOWERCASE"
+  val INIT_CAP = "INITCAP"
+  val LIKE = "LIKE"
+  val SIMILAR = "SIMILAR"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index ac9c85a..7d9de83 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -189,6 +189,12 @@ object RexNodeTranslator {
     name match {
       case BuiltInFunctionNames.SUBSTRING => SqlStdOperatorTable.SUBSTRING
       case BuiltInFunctionNames.TRIM => SqlStdOperatorTable.TRIM
+      case BuiltInFunctionNames.CHAR_LENGTH => SqlStdOperatorTable.CHAR_LENGTH
+      case BuiltInFunctionNames.UPPER_CASE => SqlStdOperatorTable.UPPER
+      case BuiltInFunctionNames.LOWER_CASE => SqlStdOperatorTable.LOWER
+      case BuiltInFunctionNames.INIT_CAP => SqlStdOperatorTable.INITCAP
+      case BuiltInFunctionNames.LIKE => SqlStdOperatorTable.LIKE
+      case BuiltInFunctionNames.SIMILAR => SqlStdOperatorTable.SIMILAR_TO
       case _ => ???
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b9b6f3/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
index eb48b70..a55daf7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
@@ -31,6 +31,10 @@ import org.junit.Test
 
 class ScalarFunctionsTest {
 
+  // ----------------------------------------------------------------------------------------------
+  // String functions
+  // ----------------------------------------------------------------------------------------------
+
   @Test
   def testSubstring(): Unit = {
     testFunction(
@@ -79,6 +83,117 @@ class ScalarFunctionsTest {
       "This is a test String")
   }
 
+  @Test
+  def testCharLength(): Unit = {
+    testFunction(
+      'f0.charLength(),
+      "f0.charLength()",
+      "CHAR_LENGTH(f0)",
+      "22")
+
+    testFunction(
+      'f0.charLength(),
+      "charLength(f0)",
+      "CHARACTER_LENGTH(f0)",
+      "22")
+  }
+
+  @Test
+  def testUpperCase(): Unit = {
+    testFunction(
+      'f0.upperCase(),
+      "f0.upperCase()",
+      "UPPER(f0)",
+      "THIS IS A TEST STRING.")
+  }
+
+  @Test
+  def testLowerCase(): Unit = {
+    testFunction(
+      'f0.lowerCase(),
+      "f0.lowerCase()",
+      "LOWER(f0)",
+      "this is a test string.")
+  }
+
+  @Test
+  def testInitCap(): Unit = {
+    testFunction(
+      'f0.initCap(),
+      "f0.initCap()",
+      "INITCAP(f0)",
+      "This Is A Test String.")
+  }
+
+  @Test
+  def testConcat(): Unit = {
+    testFunction(
+      'f0 + 'f0,
+      "f0 + f0",
+      "f0||f0",
+      "This is a test String.This is a test String.")
+  }
+
+  @Test
+  def testLike(): Unit = {
+    testFunction(
+      'f0.like("Th_s%"),
+      "f0.like('Th_s%')",
+      "f0 LIKE 'Th_s%'",
+      "true")
+
+    testFunction(
+      'f0.like("%is a%"),
+      "f0.like('%is a%')",
+      "f0 LIKE '%is a%'",
+      "true")
+  }
+
+  @Test
+  def testNotLike(): Unit = {
+    testFunction(
+      !'f0.like("Th_s%"),
+      "!f0.like('Th_s%')",
+      "f0 NOT LIKE 'Th_s%'",
+      "false")
+
+    testFunction(
+      !'f0.like("%is a%"),
+      "!f0.like('%is a%')",
+      "f0 NOT LIKE '%is a%'",
+      "false")
+  }
+
+  @Test
+  def testSimilar(): Unit = {
+    testFunction(
+      'f0.similar("_*"),
+      "f0.similar('_*')",
+      "f0 SIMILAR TO '_*'",
+      "true")
+
+    testFunction(
+      'f0.similar("This (is)? a (test)+ Strin_*"),
+      "f0.similar('This (is)? a (test)+ Strin_*')",
+      "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*'",
+      "true")
+  }
+
+  @Test
+  def testNotSimilar(): Unit = {
+    testFunction(
+      !'f0.similar("_*"),
+      "!f0.similar('_*')",
+      "f0 NOT SIMILAR TO '_*'",
+      "false")
+
+    testFunction(
+      !'f0.similar("This (is)? a (test)+ Strin_*"),
+      "!f0.similar('This (is)? a (test)+ Strin_*')",
+      "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*'",
+      "false")
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   def testFunction(


Mime
View raw message