flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/3] flink git commit: [FLINK-5223] [doc] Add documentation of UDTF in Table API & SQL
Date Tue, 13 Dec 2016 13:34:18 GMT
Repository: flink
Updated Branches:
  refs/heads/master 270140a1d -> 5baea3f2e


[FLINK-5223] [doc] Add documentation of UDTF in Table API & SQL

This closes #2956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c86efbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c86efbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c86efbb

Branch: refs/heads/master
Commit: 5c86efbb449c631aea0b1b490cec706ad7596b44
Parents: da4af12
Author: Jark Wu <wuchong.wc@alibaba-inc.com>
Authored: Wed Dec 7 21:18:58 2016 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Dec 13 14:13:17 2016 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md | 123 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c86efbb/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 2b42ab2..9271803 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -3897,6 +3897,129 @@ object TimestampModifier extends ScalarFunction {
 </div>
 </div>
 
+### User-defined Table Functions
+
+A user-defined table function is implemented similar to a user-defined scalar function but
can return a set of values instead of a single value. The returned set of values can consist
of multiple columns and multiple rows similar to a standard table. A user-defined table function
works on zero, one, or multiple scalar values as input and returns multiple rows as output.
+
+In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.api.table.functions`
and implement (one or more) evaluation methods. The behavior of a table function is determined
by its evaluation methods. An evaluation method must be declared `public` and named `eval`.
The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter
types of the evaluation methods determine all valid parameters of the table function. The
type of the returned table is determined by the generic type of `TableFunction`. Evaluation
methods emit output rows using the protected `collect(T)` method.
+
+In the Table API, a table function is used with `.join(Expression)` or `.leftOuterJoin(Expression)`
for Scala users and `.join(String)` or `.leftOuterJoin(String)` for Java users. The `join`
operator (cross) joins each row from the outer table (table on the left of the operator) with
all rows produced by the table-valued function (which is on the right side of the operator).
The `leftOuterJoin` operator joins each row from the outer table (table on the left of the
operator) with all rows produced by the table-valued function (which is on the right side
of the operator) and preserves outer rows for which the table function returns an empty table.
In SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with ON TRUE
condition (see examples below).
+
+The following examples show how to define a table-valued function and use it:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// the generic type "Tuple2<String, Integer>" determines the returned table type has
two columns,
+// the first is a String type and the second is an Integer type
+public class Split extends TableFunction<Tuple2<String, Integer>> {
+    public void evel(String str) {
+        for (String s : str.split(" ")) {
+            // use collect(...) to emit an output row
+            collect(new Tuple2<String, Integer>(s, s.length()));
+        }
+    }
+}
+
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+Table myTable = ...         // table schema: [a: String]
+
+// register the function
+tableEnv.registerFunction("split", new Split());
+
+// use the function in Java Table API
+// use AS to rename column names
+myTable.join("split(a) as (word, length)").select("a, word, length");
+myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
+
+// use the function in SQL API, LATERAL and TABLE keywords are required
+// CROSS JOIN a table function (equivalent to "join" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word,
length) ON TRUE");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// the generic type "(String, Integer)" determines the returned table type has two columns,
+// the first is a String type and the second is an Integer type
+class Split extends TableFunction[(String, Integer)] {
+  def evel(str: String): Unit = {
+    // use collect(...) to emit an output row
+    str.split(" ").foreach(x -> collect((x, x.length))
+  }
+}
+
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+val myTable = ...         // table schema: [a: String]
+
+// use the function in Scala Table API (Note: No registration required in Scala Table API)
+val split = new Split()
+// use AS to rename column names
+myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length);
+myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length);
+
+// register and use the function in SQL API, LATERAL and TABLE keywords are required
+tableEnv.registerFunction("split", new Split())
+// CROSS JOIN a table function (equivalent to "join" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length)
ON TRUE");
+{% endhighlight %}
+**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton
and will cause concurrency issues.
+</div>
+</div>
+
+Please note that POJO types do not have a deterministic field order. Therefore, you cannot
rename the fields of POJO returned by a table function using `AS`.
+
+By default the result type of a `TableFunction` is determined by Flinkā€™s automatic type
extraction facilities. This works well for basic types and simple POJOs but might be wrong
for more complex, custom, or composite types. In such a case, the type of the result can be
manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
+
+The following example shows an example of a `TableFunction` that returns a `Row` type which
requires explicit type information. We define that the returned table type should be `RowTypeInfo(String,
Integer)` by overriding `TableFunction#getResultType()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class CustomTypeSplit extends TableFunction<Row> {
+    public void eval(String str) {
+        for (String s : str.split(" ")) {
+            Row row = new Row(2);
+            row.setField(0, s);
+            row.setField(1, s.length);
+            collect(row);
+        }
+    }
+
+    @Override
+    public TypeInformation<Row> getResultType() {
+        return new RowTypeInfo(new TypeInformation[]{
+               			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CustomTypeSplit extends TableFunction[Row] {
+  def eval(str: String): Unit = {
+    str.split(" ").foreach({ s =>
+      val row = new Row(2)
+      row.setField(0, s)
+      row.setField(1, s.length)
+      collect(row)
+    })
+  }
+
+  override def getResultType: TypeInformation[Row] = {
+    new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO,
+                        BasicTypeInfo.INT_TYPE_INFO))
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
 ### Limitations
 
 The following operations are not supported yet:


Mime
View raw message