flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [2/2] flink git commit: [FLINK-6892] [table] Add L/RPAD support in SQL
Date Wed, 24 Jan 2018 10:46:49 GMT
[FLINK-6892] [table] Add L/RPAD support in SQL

This closes #4127.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a11dd2c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a11dd2c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a11dd2c4

Branch: refs/heads/master
Commit: a11dd2c4cb297cd6677a7103a28e5d7888ec5711
Parents: f1e4d25
Author: sunjincheng121 <sunjincheng121@gmail.com>
Authored: Thu Jun 15 13:23:32 2017 +0800
Committer: twalthr <twalthr@apache.org>
Committed: Wed Jan 24 11:40:23 2018 +0100

----------------------------------------------------------------------
 docs/dev/table/sql.md                           | 21 +++++
 .../table/codegen/calls/BuiltInMethods.scala    | 13 ++++
 .../table/codegen/calls/FunctionGenerator.scala | 11 +++
 .../functions/sql/ScalarSqlFunctions.scala      | 18 +++++
 .../runtime/functions/ScalarFunctions.scala     | 82 +++++++++++++++++++-
 .../flink/table/validate/FunctionCatalog.scala  |  3 +
 .../table/expressions/ScalarFunctionsTest.scala | 25 ++++++
 7 files changed, 169 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 42b3965..9003ec6 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1755,6 +1755,27 @@ CONCAT_WS(separator, string1, string2,...)
       </td>
       <td>
         <p>Returns the string that results from concatenating the arguments using a
separator. The separator is added between the strings to be concatenated. Returns NULL If
the separator is NULL. CONCAT_WS() does not skip empty strings. However, it does skip any
NULL argument. E.g. <code>CONCAT_WS("~", "AA", "BB", "", "CC")</code> returns
<code>AA~BB~~CC</code></p>
+  </td>
+    </tr>
+
+        <tr>
+      <td>
+        {% highlight text %}
+LPAD(text string, len integer, pad string)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the string text, left-padded with the string pad to a length of
len characters. If text is longer than len, the return value is shortened to len characters.
E.g. <code>LPAD('hi',4,'??')</code> returns <code>??hi</code> <code>LPAD('hi',1,'??')</code>
returns <code>h</code></p>
+      </td>
+    </tr>
+    <tr>
+      <td>
+        {% highlight text %}
+RPAD(text string, len integer, pad string)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the string text, right-padded with the string pad to a length of
len characters. If text is longer than len, the return value is shortened to len characters.
E.g. <code>RPAD('hi',4,'??')</code> returns <code>hi??</code> <code>RPAD('hi',1,'??')</code>
returns <code>h</code></p>
       </td>
     </tr>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 6791e1b..ea4f0fd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -96,5 +96,18 @@ object BuiltInMethods {
     Types.lookupMethod(
       classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]])
 
+  val LPAD = Types.lookupMethod(
+    classOf[ScalarFunctions],
+    "lpad",
+    classOf[String],
+    classOf[Integer],
+    classOf[String])
+  val RPAD = Types.lookupMethod(
+    classOf[ScalarFunctions],
+    "rpad",
+    classOf[String],
+    classOf[Integer],
+    classOf[String])
+
   val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long])
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 0a2789f..412cdfc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -512,6 +512,17 @@ object FunctionGenerator {
     Seq(SqlTimeTypeInfo.TIMESTAMP, STRING_TYPE_INFO),
     new DateFormatCallGen
   )
+  addSqlFunctionMethod(
+    ScalarSqlFunctions.LPAD,
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.LPAD)
+
+  addSqlFunctionMethod(
+    ScalarSqlFunctions.RPAD,
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.RPAD)
 
   // ----------------------------------------------------------------------------------------------
   // Cryptographic Hash functions

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index 05881b3..891aba9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -67,6 +67,24 @@ object ScalarSqlFunctions {
       OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)),
     SqlFunctionCategory.NUMERIC)
 
+  val LPAD = new SqlFunction(
+    "LPAD",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(
+      ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
+    null,
+    OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER),
+    SqlFunctionCategory.STRING)
+
+  val RPAD = new SqlFunction(
+    "RPAD",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(
+      ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
+    null,
+    OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER),
+    SqlFunctionCategory.STRING)
+
   val MD5 = new SqlFunction(
     "MD5",
     SqlKind.OTHER_FUNCTION,

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 36edba2..87e58a5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -89,8 +89,7 @@ object ScalarFunctions {
   def log(x: Double): Double = {
     if (x <= 0.0) {
       throw new IllegalArgumentException(s"x of 'log(x)' must be > 0, but x = $x")
-    }
-    else {
+    } else {
       Math.log(x)
     }
   }
@@ -104,9 +103,84 @@ object ScalarFunctions {
     }
     if (base <= 1.0) {
       throw new IllegalArgumentException(s"base of 'log(base, x)' must be > 1, but base
= $base")
-    }
-    else {
+    } else {
       Math.log(x) / Math.log(base)
     }
   }
+
+  /**
+    * Returns the string str, left-padded with the string pad to a length of len characters.
+    * If str is longer than len, the return value is shortened to len characters.
+    */
+  def lpad(base: String, len: Integer, pad: String): String = {
+    if (len < 0) {
+      return null
+    }
+    var data = "".toCharArray
+    if (data.length < len) {
+      data = new Array[Char](len)
+    }
+    val baseChars = base.toCharArray
+    val padChars = pad.toCharArray
+
+    // The length of the padding needed
+    val pos = Math.max(len - base.length, 0)
+
+    // Copy the padding
+    var i = 0
+    while (i < pos) {
+      {
+        var j = 0
+        while (j < pad.length && j < pos - i) {
+          data(i + j) = padChars(j)
+          j += 1
+        }
+      }
+      i += pad.length
+    }
+
+    // Copy the base
+    i = 0
+    while (pos + i < len && i < base.length) {
+      data(pos + i) = baseChars(i)
+      i += 1
+    }
+    new String(data)
+  }
+
+  /**
+    * Returns the string str, right-padded with the string pad to a length of len characters.
+    * If str is longer than len, the return value is shortened to len characters.
+    */
+  def rpad(base: String, len: Integer, pad: String): String = {
+    if (len < 0) {
+      return null
+    }
+    var data = "".toCharArray
+    if (data.length < len) {
+      data = new Array[Char](len)
+    }
+    val baseChars = base.toCharArray
+    val padChars = pad.toCharArray
+
+    var pos = 0
+
+    // Copy the base
+    while (pos < base.length && pos < len) {
+      data(pos) = baseChars(pos)
+      pos += 1
+    }
+
+    // Copy the padding
+    while (pos < len) {
+      var i = 0
+      while (i < pad.length && i < len - pos) {
+        data(pos + i) = padChars(i)
+        i += 1
+      }
+      pos += pad.length
+    }
+
+    new String(data)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index fb359c2..37df23a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -420,9 +420,12 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.BIN,
     SqlStdOperatorTable.TIMESTAMP_ADD,
     ScalarSqlFunctions.LOG,
+    ScalarSqlFunctions.LPAD,
+    ScalarSqlFunctions.RPAD,
     ScalarSqlFunctions.MD5,
     ScalarSqlFunctions.SHA1,
     ScalarSqlFunctions.SHA256,
+
     // EXTENSIONS
     BasicOperatorTable.TUMBLE,
     BasicOperatorTable.HOP,

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 79dc67c..e4f88b5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -353,6 +353,31 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
+  def testLPad(): Unit = {
+    testSqlApi("LPAD('hi',4,'??')", "??hi")
+    testSqlApi("LPAD('hi',1,'??')", "h")
+    testSqlApi("LPAD('',1,'??')", "?")
+    testSqlApi("LPAD('',30,'??')", "??????????????????????????????")
+    testSqlApi("LPAD('111',-2,'??')", "null")
+    testSqlApi("LPAD(f33,1,'??')", "null")
+    testSqlApi("LPAD('\u0061\u0062',1,'??')", "a") // the unicode of ab is \u0061\u0062
+    testSqlApi("LPAD('⎨⎨',1,'??')", "⎨")
+
+  }
+
+  @Test
+  def testRPad(): Unit = {
+    testSqlApi("RPAD('hi',4,'??')", "hi??")
+    testSqlApi("RPAD('hi',1,'??')", "h")
+    testSqlApi("RPAD('',1,'??')", "?")
+    testSqlApi("RPAD('1',30,'??')", "1?????????????????????????????")
+    testSqlApi("RPAD('111',-2,'??')", "null")
+    testSqlApi("RPAD(f33,1,'??')", "null")
+    testSqlApi("RPAD('\u0061\u0062',1,'??')", "a") // the unicode of ab is \u0061\u0062
+    testSqlApi("RPAD('üö',1,'??')", "ü")
+  }
+
+  @Test
   def testBin(): Unit = {
 
     testAllApis(


Mime
View raw message