flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/5] flink git commit: [FLINK-6886] [table] Fix conversion of Row Table to POJO.
Date Mon, 19 Jun 2017 22:18:50 GMT
[FLINK-6886] [table] Fix conversion of Row Table to POJO.

This closes #4102.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b91df2b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b91df2b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b91df2b

Branch: refs/heads/release-1.3
Commit: 8b91df2b3cd0c0ef733902ad742045b318bac0fd
Parents: 2321898
Author: sunjincheng121 <sunjincheng121@gmail.com>
Authored: Sat Jun 17 03:25:08 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Jun 19 22:43:03 2017 +0200

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      | 22 ++++++++--
 .../table/api/java/stream/sql/SqlITCase.java    |  8 ++--
 .../flink/table/api/java/utils/Pojos.java       | 45 ++++++++++++++++++++
 .../api/scala/stream/TableSourceITCase.scala    |  6 +--
 .../api/scala/stream/sql/OverWindowITCase.scala | 32 +++++++-------
 .../table/api/scala/stream/sql/SqlITCase.scala  | 20 ++++-----
 .../api/scala/stream/table/CalcITCase.scala     | 24 +++++------
 .../table/GroupWindowAggregationsITCase.scala   | 10 ++---
 .../scala/stream/table/OverWindowITCase.scala   | 10 ++---
 .../api/scala/stream/table/UnionITCase.scala    |  8 ++--
 .../api/scala/stream/utils/StreamITCase.scala   |  4 +-
 .../datastream/DataStreamAggregateITCase.scala  | 12 +++---
 .../datastream/DataStreamCalcITCase.scala       |  4 +-
 .../DataStreamUserDefinedFunctionITCase.scala   | 16 +++----
 .../datastream/TimeAttributesITCase.scala       | 45 +++++++++++++++++---
 15 files changed, 179 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index bc5038d..178bd9f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.api
 
 import _root_.java.lang.{Boolean => JBool}
 import _root_.java.util.concurrent.atomic.AtomicInteger
+import _root_.java.util.{List => JList}
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl,
RelRecordType}
 import org.apache.calcite.rel.{RelNode, RelVisitor}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
 import org.apache.calcite.sql.SqlKind
@@ -38,7 +39,7 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.calcite.RelTimeIndicatorConverter
+import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute,
UnresolvedFieldReference}
 import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -604,7 +605,22 @@ abstract class StreamTableEnvironment(
       withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {
     val relNode = table.getRelNode
     val dataStreamPlan = optimize(relNode, updatesAsRetraction)
-    translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag)
+
+    // zip original field names with optimized field types
+    val fieldTypes = relNode.getRowType.getFieldList.asScala
+      .zip(dataStreamPlan.getRowType.getFieldList.asScala)
+      // get name of original plan and type of optimized plan
+      .map(x => (x._1.getName, x._2.getType))
+      // add field indexes
+      .zipWithIndex
+      // build new field types
+      .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))
+
+    // build a record type from list of field types
+    val rowType = new RelRecordType(
+      fieldTypes.toList.asJava.asInstanceOf[JList[RelDataTypeField]])
+
+    translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index d827cd6..261e7e5 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -67,7 +67,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		Table result = tableEnv.sql(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
 		env.execute();
 
 		List<String> expected = new ArrayList<>();
@@ -92,7 +92,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		Table result = tableEnv.sql(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
 		env.execute();
 
 		List<String> expected = new ArrayList<>();
@@ -116,7 +116,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		Table result = tableEnv.sql(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
 		env.execute();
 
 		List<String> expected = new ArrayList<>();
@@ -147,7 +147,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		Table result = tableEnv.sql(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
 		env.execute();
 
 		List<String> expected = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
new file mode 100644
index 0000000..3048835
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/Pojos.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.utils;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+
+/**
+ * POJOs for table api testing.
+ */
+public class Pojos {
+
+	/**
+	 * Pojo1 for test.
+	 */
+	public static class Pojo1 implements Serializable {
+
+		public Timestamp ts;
+		public String msg;
+
+		@Override
+		public String toString() {
+			return "Pojo1{" +
+					"ts=" + ts +
+					", msg='" + msg + '\'' +
+					'}';
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 13ec2b4..a7037bb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -47,7 +47,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
     tEnv.sql(
       "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
       .toAppendStream[Row]
-      .addSink(new StreamITCase.StringSink)
+      .addSink(new StreamITCase.StringSink[Row])
 
     env.execute()
 
@@ -72,7 +72,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
       .where('id > 4)
       .select('last, 'score * 2)
       .toAppendStream[Row]
-      .addSink(new StreamITCase.StringSink)
+      .addSink(new StreamITCase.StringSink[Row])
 
     env.execute()
 
@@ -94,7 +94,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
     tEnv.scan(tableName)
       .where("amount > 4 && price < 9")
       .select("id, name")
-      .addSink(new StreamITCase.StringSink)
+      .addSink(new StreamITCase.StringSink[Row])
 
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 7ba5c16..64b7132 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
   }
 
@@ -93,7 +93,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "FROM MyTable"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -135,7 +135,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
       "FROM MyTable"
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -178,7 +178,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -205,7 +205,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -235,7 +235,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -260,7 +260,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "from T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
@@ -322,7 +322,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       " FROM T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -383,7 +383,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "FROM T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -451,7 +451,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       " FROM T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -512,7 +512,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "FROM T1"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -573,7 +573,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -639,7 +639,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -701,7 +701,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -762,7 +762,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -837,7 +837,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index bdc1fcc..55633ff 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -58,7 +58,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("MyTableRow", t)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
@@ -100,7 +100,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("MyTable", t)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("2,0", "4,1", "6,1")
@@ -121,7 +121,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("MyTable", t)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("3,2,Hello world")
@@ -142,7 +142,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerDataStream("MyTable", t)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("3,2,Hello world")
@@ -166,7 +166,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T2", t2)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -193,7 +193,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T2", t2)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -219,7 +219,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List("Hello", "Hello world")
@@ -244,7 +244,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -276,7 +276,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
@@ -306,7 +306,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
 
     val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
index b355cf0..adf4d44 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -42,7 +42,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2,
'_3)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -61,7 +61,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("1", "2", "3")
@@ -80,7 +80,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
       .select('a, 'b)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -100,7 +100,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
       .select('a, 'b, 'c)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -119,7 +119,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("no")
@@ -135,7 +135,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("no")
@@ -152,7 +152,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
 
     val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("no")
@@ -172,7 +172,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter('a === 3)
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("3,2,Hello world")
@@ -192,7 +192,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter( Literal(false) )
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     assertEquals(true, StreamITCase.testResults.isEmpty)
@@ -211,7 +211,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter( Literal(true) )
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -234,7 +234,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter( 'a % 2 === 0 )
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -258,7 +258,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
 
     val filterDs = ds.filter( 'a % 2 !== 0)
     val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
     val expected = mutable.MutableList(
       "1,1,Hi", "3,2,Hello world",

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 87a35bf..d78aea6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -71,7 +71,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase
{
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
     val results = windowedTable.toAppendStream[Row](queryConfig)
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
@@ -113,7 +113,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase
{
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
@@ -139,7 +139,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase
{
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
     val results = windowedTable.toAppendStream[Row](queryConfig)
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq("2,1,1,1", "2,2,6,2")
@@ -167,7 +167,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase
{
               weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -203,7 +203,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase
{
       .select(weightAvgFun('long, 'int))
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq("12", "8", "2", "3", "1")

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
index f396896..0850ce1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
@@ -69,7 +69,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .select('c, 'mycount, 'wAvg)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -124,7 +124,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
         weightAvgFun('b, 'a) over 'w)
 
     val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -179,7 +179,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW
as 'w)
       .select('a, 'c.sum over 'w, 'c.min over 'w)
     val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -242,7 +242,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
 
     val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -305,7 +305,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
 
     val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
index 2b496e3..4623b44 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
@@ -44,7 +44,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
     val unionDs = ds1.unionAll(ds2).select('c)
 
     val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -64,7 +64,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
     val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
 
     val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Hi", "Hallo")
@@ -83,7 +83,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
     val unionDs = ds1.unionAll(ds2)
 
     val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     assertEquals(true, StreamITCase.testResults.isEmpty)
@@ -102,7 +102,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
     val unionDs = ds1.unionAll(ds2)
 
     val results = unionDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     println(StreamITCase.testResults)

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
index effde8e..4c6e6f0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -44,8 +44,8 @@ object StreamITCase {
     assertEquals(expected.asScala, StreamITCase.testResults.sorted)
   }
 
-  final class StringSink extends RichSinkFunction[Row]() {
-    def invoke(value: Row) {
+  final class StringSink[T] extends RichSinkFunction[T]() {
+    def invoke(value: T) {
       testResults.synchronized {
         testResults += value.toString
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
index 3ac664d..8837e03 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -70,7 +70,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase
{
       .select('int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -105,7 +105,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase
{
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -142,7 +142,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase
{
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -176,7 +176,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase
{
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -205,7 +205,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase
{
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -233,7 +233,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase
{
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
     val expected = Seq(
       "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
index 12d7202..dcd7800 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
@@ -49,7 +49,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
       .select('c)
 
     val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Hello")
@@ -72,7 +72,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
       .select('c)
 
     val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Hello", "Hello world")

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
index 9efe6a1..53caee3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
@@ -55,7 +55,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c, 'name, 'age)
       .toAppendStream[Row]
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
@@ -72,7 +72,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c, 'd, 'e)
       .toAppendStream[Row]
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -92,7 +92,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c, 'd, 'e)
       .toAppendStream[Row]
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19")
@@ -112,7 +112,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('a, 's)
 
     val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList("3,Hello", "3,world")
@@ -136,7 +136,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('a, 's)
 
     val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -166,7 +166,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c, 'd, 'f, 'h, 'e, 'g, 'i)
       .toAppendStream[Row]
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -189,7 +189,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
 
     val result = t.select(func0('c), func1('c),func2('c))
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
@@ -211,7 +211,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
       .select('c)
       .join(varArgsFunc0("1", "2", 'c))
 
-    result.addSink(new StreamITCase.StringSink)
+    result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(

http://git-wip-us.apache.org/repos/asf/flink/blob/8b91df2b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index c25dfdf..73cb701 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.java.utils.Pojos.Pojo1
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.StreamITCase
 import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
@@ -104,7 +105,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val t = table.select('rowtime.cast(Types.STRING))
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -135,7 +136,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -162,7 +163,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val t = table.join(func('rowtime, 'proctime, 'string) as 's).select('rowtime, 's)
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -197,7 +198,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       .select('w.rowtime, 's.count)
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -223,7 +224,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val t = table.unionAll(table).select('rowtime)
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -261,7 +262,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -294,7 +295,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       .select('w2.rowtime, 'w2.end, 'int.count)
 
     val results = t.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
@@ -306,6 +307,36 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase
{
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testCalcMaterializationWithPojoType(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+    tEnv.registerTable("T1", table)
+    val querySql = "select rowtime as ts, string as msg from T1"
+
+    val results = tEnv.sql(querySql).toAppendStream[Pojo1]
+    results.addSink(new StreamITCase.StringSink[Pojo1])
+    env.execute()
+
+    val expected = Seq(
+      "Pojo1{ts=1970-01-01 00:00:00.001, msg='Hi'}",
+      "Pojo1{ts=1970-01-01 00:00:00.002, msg='Hallo'}",
+      "Pojo1{ts=1970-01-01 00:00:00.003, msg='Hello'}",
+      "Pojo1{ts=1970-01-01 00:00:00.004, msg='Hello'}",
+      "Pojo1{ts=1970-01-01 00:00:00.007, msg='Hello'}",
+      "Pojo1{ts=1970-01-01 00:00:00.008, msg='Hello world'}",
+      "Pojo1{ts=1970-01-01 00:00:00.016, msg='Hello world'}")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }
 
 object TimeAttributesITCase {


Mime
View raw message