flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [tests] SumMinMaxITCase to use collect() rather than flakey temp files
Date Thu, 09 Apr 2015 20:13:31 GMT
Repository: flink
Updated Branches:
  refs/heads/master 91102695b -> e45f13f53


[tests] SumMinMaxITCase to use collect() rather than flakey temp files


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cff478d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cff478d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cff478d

Branch: refs/heads/master
Commit: 1cff478d3712e3e9de59f6cf296b0669572eff0b
Parents: 9110269
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Apr 8 15:39:01 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Apr 9 21:00:37 2015 +0200

----------------------------------------------------------------------
 .../api/scala/operators/SumMinMaxITCase.scala   | 51 ++++++--------------
 1 file changed, 16 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1cff478d/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index d57c6e2d7f..180486b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -20,35 +20,18 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase}
-import org.junit.{Test, After, Before, Rule}
-import org.junit.rules.TemporaryFolder
+import org.apache.flink.test.util.MultipleProgramsTestBase
+
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+import org.junit.Assert._
 
 import org.apache.flink.api.scala._
 
-
 @RunWith(classOf[Parameterized])
 class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = null
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after(): Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
 
   @Test
   def testFullAggregate(): Unit = {
@@ -56,18 +39,18 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)
 
-    val aggregateDs = ds
+    val aggregateDs : DataSet[(Int, Long)] = ds
       .sum(0)
       .andMax(1)
       // Ensure aggregate operator correctly copies other fields
       .filter(_._3 != null)
       .map{ t => (t._1, t._2) }
 
-    aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
 
-    env.execute()
+    val result: Seq[(Int, Long)] = aggregateDs.collect
 
-    expected = "231,6\n"
+    assertEquals(1, result.size)
+    assertEquals((231, 6L), result.head)
   }
 
   @Test
@@ -83,11 +66,10 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
       .filter(_._3 != null)
       .map { t => (t._2, t._1) }
 
-    aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-
-    env.execute()
+    val result : Seq[(Long, Int)] = aggregateDs.collect.sortWith((a, b) => a._1 < b._1)
 
-    expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+    val expected : Seq[(Long, Int)] = Seq((1L, 1), (2L, 5), (3L, 15), (4L, 34), (5L, 65),
(6L, 111))
+    assertEquals(expected, result)
   }
 
   @Test
@@ -96,18 +78,17 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)
 
-    val aggregateDs = ds
+    val aggregateDs: DataSet[Int] = ds
       .groupBy(1)
       .min(0)
       .min(0)
       // Ensure aggregate operator correctly copies other fields
       .filter(_._3 != null)
-      .map { t => new Tuple1(t._1) }
-
-    aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+      .map { t => t._1 }
 
-    env.execute()
+    val result: Seq[Int] = aggregateDs.collect
 
-    expected = "1\n"
+    assertEquals(1, result.size)
+    assertEquals(Seq(1), result)
   }
 }


Mime
View raw message