flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject flink git commit: [FLINK-6040] [table] DataStreamUserDefinedFunctionITCase occasionally fails
Date Wed, 15 Mar 2017 01:13:50 GMT
Repository: flink
Updated Branches:
  refs/heads/master 90c7415b0 -> 6f7d7a025


[FLINK-6040] [table] DataStreamUserDefinedFunctionITCase occasionally fails

This closes #3530.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f7d7a02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f7d7a02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f7d7a02

Branch: refs/heads/master
Commit: 6f7d7a0253406bad1a00bc53e42b0b2096b4ebda
Parents: 90c7415
Author: Zhuoluo Yang <zhuoluo.yzl@alibaba-inc.com>
Authored: Tue Mar 14 18:08:14 2017 +0800
Committer: Kurt Young <kurt@apache.org>
Committed: Wed Mar 15 09:13:03 2017 +0800

----------------------------------------------------------------------
 .../DataStreamUserDefinedFunctionITCase.scala   | 36 ++++++--------------
 1 file changed, 11 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f7d7a02/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
index 853c771..2e8a065 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
@@ -27,18 +27,22 @@ import org.apache.flink.table.expressions.utils.{Func13, RichFunc2}
 import org.apache.flink.table.utils._
 import org.apache.flink.types.Row
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Before, Test}
 
 import scala.collection.mutable
 
 class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
 
-  @Test
-  def testCrossJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+  val tEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
+
+  @Before
+  def clear(): Unit = {
     StreamITCase.clear
+  }
 
+  @Test
+  def testCrossJoin(): Unit = {
     val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
     val func0 = new TableFunc0
     val pojoFunc0 = new PojoTableFunc()
@@ -60,10 +64,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
 
   @Test
   def testLeftOuterJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
     val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
     val func0 = new TableFunc0
 
@@ -83,8 +83,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
 
   @Test
   def testUserDefinedTableFunctionWithParameter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
     val tableFunc1 = new RichTableFunc1
     tEnv.registerFunction("RichTableFunc1", tableFunc1)
     UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " "))
@@ -105,8 +103,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
 
   @Test
   def testUserDefinedTableFunctionWithUserDefinedScalarFunction(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
     val tableFunc1 = new RichTableFunc1
     val richFunc2 = new RichFunc2
     tEnv.registerFunction("RichTableFunc1", tableFunc1)
@@ -137,10 +133,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
 
   @Test
   def testTableFunctionConstructorWithParams(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
     val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
     val config = Map("key1" -> "value1", "key2" -> "value2")
     val func30 = new TableFunc3(null)
@@ -172,10 +164,6 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
 
   @Test
   def testScalarFunctionConstructorWithParams(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
     val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
     val func0 = new Func13("default")
     val func1 = new Func13("Sunny")
@@ -197,13 +185,11 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
 
   @Test
   def testTableFunctionWithVariableArguments(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
     val varArgsFunc0 = new VarArgsFunc0
-    tableEnv.registerFunction("VarArgsFunc0", varArgsFunc0)
+    tEnv.registerFunction("VarArgsFunc0", varArgsFunc0)
 
     val result = testData(env)
-      .toTable(tableEnv, 'a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c)
       .select('c)
       .join(varArgsFunc0("1", "2", 'c))
 


Mime
View raw message