flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-5827] [table] Exception when do filter after join a udtf which returns a POJO type
Date Thu, 02 Mar 2017 15:33:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master f6c9b32c1 -> ff552b440


[FLINK-5827] [table] Exception when do filter after join a udtf which returns a POJO type

This closes #3357.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff552b44
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff552b44
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff552b44

Branch: refs/heads/master
Commit: ff552b440e3d493b41083b6b63534cfcd83961d9
Parents: f6c9b32
Author: kaibozhou <zkb555@gmail.com>
Authored: Fri Feb 24 11:32:34 2017 +0800
Committer: twalthr <twalthr@apache.org>
Committed: Thu Mar 2 16:27:27 2017 +0100

----------------------------------------------------------------------
 .../table/plan/nodes/CommonCorrelate.scala      |   2 +-
 .../DataSetUserDefinedFunctionITCase.scala      |   3 +-
 .../DataSetUserDefinedFunctionITCase.scala      | 206 ------------------
 .../DataStreamUserDefinedFunctionITCase.scala   | 211 +++++++++++++++++++
 4 files changed, 214 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff552b44/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 61b7ffb..6c4066b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -188,7 +188,7 @@ trait CommonCorrelate {
         |getCollector().collect(${crossResultExpr.resultTerm});
         |""".stripMargin
     } else {
-      val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
+      val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo, None, pojoFieldMapping)
       filterGenerator.input1Term = filterGenerator.input2Term
       val filterCondition = filterGenerator.generateExpression(condition.get)
       s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/ff552b44/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
index d268594..3d20803 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
@@ -140,11 +140,12 @@ class DataSetUserDefinedFunctionITCase(
     val pojo = new PojoTableFunc()
     val result = in
       .join(pojo('c))
+      .where(('age > 20))
       .select('c, 'name, 'age)
       .toDataSet[Row]
 
     val results = result.collect()
-    val expected = "Jack#22,Jack,22\n" + "John#19,John,19\n" + "Anna#44,Anna,44\n"
+    val expected = "Jack#22,Jack,22\n" + "Anna#44,Anna,44\n"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff552b44/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataSetUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataSetUserDefinedFunctionITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataSetUserDefinedFunctionITCase.scala
deleted file mode 100644
index 21b87e9..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataSetUserDefinedFunctionITCase.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.datastream
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.table.expressions.utils.{Func13, RichFunc2}
-import org.apache.flink.table.utils.{RichTableFunc1, TableFunc0, TableFunc3, UserDefinedFunctionTestUtils}
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class DataSetUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testCrossJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = t
-      .join(func0('c) as('d, 'e))
-      .select('c, 'd, 'e)
-      .toDataStream[Row]
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19", "Anna#44,Anna,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testLeftOuterJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = t
-      .leftOuterJoin(func0('c) as('d, 'e))
-      .select('c, 'd, 'e)
-      .toDataStream[Row]
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "nosharp,null,null", "Jack#22,Jack,22",
-      "John#19,John,19", "Anna#44,Anna,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUserDefinedTableFunctionWithParameter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val tableFunc1 = new RichTableFunc1
-    tEnv.registerFunction("RichTableFunc1", tableFunc1)
-    UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " "))
-    StreamITCase.testResults = mutable.MutableList()
-
-    val result = StreamTestData.getSmall3TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .join(tableFunc1('c) as 's)
-      .select('a, 's)
-
-    val results = result.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("3,Hello", "3,world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUserDefinedTableFunctionWithUserDefinedScalarFunction(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val tableFunc1 = new RichTableFunc1
-    val richFunc2 = new RichFunc2
-    tEnv.registerFunction("RichTableFunc1", tableFunc1)
-    tEnv.registerFunction("RichFunc2", richFunc2)
-    UserDefinedFunctionTestUtils.setJobParameters(
-      env,
-      Map("word_separator" -> "#", "string.value" -> "test"))
-    StreamITCase.testResults = mutable.MutableList()
-
-    val result = StreamTestData.getSmall3TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .join(tableFunc1(richFunc2('c)) as 's)
-      .select('a, 's)
-
-    val results = result.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,Hi",
-      "1,test",
-      "2,Hello",
-      "2,test",
-      "3,Hello world",
-      "3,test")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testTableFunctionConstructorWithParams(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
-    val config = Map("key1" -> "value1", "key2" -> "value2")
-    val func30 = new TableFunc3(null)
-    val func31 = new TableFunc3("OneConf_")
-    val func32 = new TableFunc3("TwoConf_", config)
-
-    val result = t
-      .join(func30('c) as('d, 'e))
-      .select('c, 'd, 'e)
-      .join(func31('c) as ('f, 'g))
-      .select('c, 'd, 'e, 'f, 'g)
-      .join(func32('c) as ('h, 'i))
-      .select('c, 'd, 'f, 'h, 'e, 'g, 'i)
-      .toDataStream[Row]
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "Anna#44,Anna,OneConf_Anna,TwoConf__key=key1_value=value1_Anna,44,44,44",
-      "Anna#44,Anna,OneConf_Anna,TwoConf__key=key2_value=value2_Anna,44,44,44",
-      "Jack#22,Jack,OneConf_Jack,TwoConf__key=key1_value=value1_Jack,22,22,22",
-      "Jack#22,Jack,OneConf_Jack,TwoConf__key=key2_value=value2_Jack,22,22,22",
-      "John#19,John,OneConf_John,TwoConf__key=key1_value=value1_John,19,19,19",
-      "John#19,John,OneConf_John,TwoConf__key=key2_value=value2_John,19,19,19"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testScalarFunctionConstructorWithParams(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
-    val func0 = new Func13("default")
-    val func1 = new Func13("Sunny")
-    val func2 = new Func13("kevin2")
-
-    val result = t.select(func0('c), func1('c),func2('c))
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "default-Anna#44,Sunny-Anna#44,kevin2-Anna#44",
-      "default-Jack#22,Sunny-Jack#22,kevin2-Jack#22",
-      "default-John#19,Sunny-John#19,kevin2-John#19",
-      "default-nosharp,Sunny-nosharp,kevin2-nosharp"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  private def testData(
-      env: StreamExecutionEnvironment)
-    : DataStream[(Int, Long, String)] = {
-
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Jack#22"))
-    data.+=((2, 2L, "John#19"))
-    data.+=((3, 2L, "Anna#44"))
-    data.+=((4, 3L, "nosharp"))
-    env.fromCollection(data)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff552b44/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
new file mode 100644
index 0000000..e7ce457
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.datastream
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.table.expressions.utils.{Func13, RichFunc2}
+import org.apache.flink.table.utils.{RichTableFunc1, TableFunc0, TableFunc3, UserDefinedFunctionTestUtils}
+import org.apache.flink.table.utils.PojoTableFunc
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testCrossJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+    val pojoFunc0 = new PojoTableFunc()
+
+    val result = t
+      .join(func0('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .join(pojoFunc0('c))
+      .where(('age > 20))
+      .select('c, 'name, 'age)
+      .toDataStream[Row]
+
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testLeftOuterJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    val result = t
+      .leftOuterJoin(func0('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .toDataStream[Row]
+
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "nosharp,null,null", "Jack#22,Jack,22",
+      "John#19,John,19", "Anna#44,Anna,44")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUserDefinedTableFunctionWithParameter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tableFunc1 = new RichTableFunc1
+    tEnv.registerFunction("RichTableFunc1", tableFunc1)
+    UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " "))
+    StreamITCase.testResults = mutable.MutableList()
+
+    val result = StreamTestData.getSmall3TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .join(tableFunc1('c) as 's)
+      .select('a, 's)
+
+    val results = result.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,Hello", "3,world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUserDefinedTableFunctionWithUserDefinedScalarFunction(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tableFunc1 = new RichTableFunc1
+    val richFunc2 = new RichFunc2
+    tEnv.registerFunction("RichTableFunc1", tableFunc1)
+    tEnv.registerFunction("RichFunc2", richFunc2)
+    UserDefinedFunctionTestUtils.setJobParameters(
+      env,
+      Map("word_separator" -> "#", "string.value" -> "test"))
+    StreamITCase.testResults = mutable.MutableList()
+
+    val result = StreamTestData.getSmall3TupleDataStream(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .join(tableFunc1(richFunc2('c)) as 's)
+      .select('a, 's)
+
+    val results = result.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,Hi",
+      "1,test",
+      "2,Hello",
+      "2,test",
+      "3,Hello world",
+      "3,test")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testTableFunctionConstructorWithParams(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val config = Map("key1" -> "value1", "key2" -> "value2")
+    val func30 = new TableFunc3(null)
+    val func31 = new TableFunc3("OneConf_")
+    val func32 = new TableFunc3("TwoConf_", config)
+
+    val result = t
+      .join(func30('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .join(func31('c) as ('f, 'g))
+      .select('c, 'd, 'e, 'f, 'g)
+      .join(func32('c) as ('h, 'i))
+      .select('c, 'd, 'f, 'h, 'e, 'g, 'i)
+      .toDataStream[Row]
+
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Anna#44,Anna,OneConf_Anna,TwoConf__key=key1_value=value1_Anna,44,44,44",
+      "Anna#44,Anna,OneConf_Anna,TwoConf__key=key2_value=value2_Anna,44,44,44",
+      "Jack#22,Jack,OneConf_Jack,TwoConf__key=key1_value=value1_Jack,22,22,22",
+      "Jack#22,Jack,OneConf_Jack,TwoConf__key=key2_value=value2_Jack,22,22,22",
+      "John#19,John,OneConf_John,TwoConf__key=key1_value=value1_John,19,19,19",
+      "John#19,John,OneConf_John,TwoConf__key=key2_value=value2_John,19,19,19"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testScalarFunctionConstructorWithParams(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new Func13("default")
+    val func1 = new Func13("Sunny")
+    val func2 = new Func13("kevin2")
+
+    val result = t.select(func0('c), func1('c),func2('c))
+
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "default-Anna#44,Sunny-Anna#44,kevin2-Anna#44",
+      "default-Jack#22,Sunny-Jack#22,kevin2-Jack#22",
+      "default-John#19,Sunny-John#19,kevin2-John#19",
+      "default-nosharp,Sunny-nosharp,kevin2-nosharp"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  private def testData(
+      env: StreamExecutionEnvironment)
+    : DataStream[(Int, Long, String)] = {
+
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Jack#22"))
+    data.+=((2, 2L, "John#19"))
+    data.+=((3, 2L, "Anna#44"))
+    data.+=((4, 3L, "nosharp"))
+    env.fromCollection(data)
+  }
+
+}


Mime
View raw message