flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-2498] [tests] Clean up GroupReduceITCase and make it more robust by avoiding temp files
Date Mon, 10 Aug 2015 18:48:07 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0ce757fd7 -> ceb5c5e2f


[FLINK-2498] [tests] Clean up GroupReduceITCase and make it more robust by avoiding temp files


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceb5c5e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceb5c5e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceb5c5e2

Branch: refs/heads/master
Commit: ceb5c5e2f442bfecd5a5dbbc6feb5cfb58cf267b
Parents: 0ce757f
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Aug 10 18:33:17 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 10 18:33:17 2015 +0200

----------------------------------------------------------------------
 .../api/scala/operators/GroupReduceITCase.scala | 821 ++++++++++---------
 1 file changed, 450 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ceb5c5e2/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index 09f0c51..559d1d1 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -21,71 +21,53 @@ import java.lang.Iterable
 
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.util.CollectionDataSets.{CrazyNested, POJO, MutableTuple3,
-CustomType}
+import org.apache.flink.api.scala.util.CollectionDataSets.{MutableTuple3, CustomType}
 import org.apache.flink.optimizer.Optimizer
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.util.Collector
-import org.hamcrest.core.{IsNot, IsEqual}
-import org.junit._
-import org.junit.rules.TemporaryFolder
+
+import org.junit.Test
+import org.junit.Assert._
+import org.junit.Assume._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
 import scala.collection.JavaConverters._
 
-import org.apache.flink.api.scala._
-
 @RunWith(classOf[Parameterized])
 class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = null
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after(): Unit = {
-    if(expected != null) {
-      TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
-    }
-  }
-
+  
+  /**
+   * check correctness of groupReduce on tuples with key field selector
+   */
   @Test
   def testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector(): Unit = {
-    /*
-     * check correctness of groupReduce on tuples with key field selector
-     */
+
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds =  CollectionDataSets.get3TupleDataSet(env)
     val reduceDs =  ds.groupBy(1).reduceGroup {
       in =>
         in.map(t => (t._1, t._2)).reduce((l, r) => (l._1 + r._1, l._2))
     }
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1\n" + "5,2\n" + "15,3\n" + "34,4\n" + "65,5\n" + "111,6\n"
+    val result: Seq[(Int, Long)] = reduceDs.collect().sortBy(_._1)
+    
+    val expected = Seq[(Int, Long)]( (1,1), (5,2), (15,3), (34,4), (65,5), (111,6) )
+
+    assertEquals(expected, result)
   }
 
+  /**
+   * check correctness of groupReduce on tuples with multiple key field selector
+   */
   @Test
   def testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelector(): Unit = {
-    /*
-     * check correctness of groupReduce on tuples with multiple key field selector
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds =  CollectionDataSets
-      .get5TupleDataSet(env)
+    val ds =  CollectionDataSets.get5TupleDataSet(env)
+    
     val reduceDs =  ds.groupBy(4, 0).reduceGroup {
       in =>
         val (i, l, l2) = in
@@ -93,18 +75,29 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
           .reduce((l, r) => (l._1, l._2 + r._2, l._3))
         (i, l, 0, "P-)", l2)
     }
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,0,P-),1\n" + "2,3,0,P-),1\n" + "2,2,0,P-),2\n" + "3,9,0,P-),2\n" + "3,6,0,"
+
-      "P-),3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,0,P-),1\n" + "5,29,0,P-),"
+
-      "2\n" + "5,25,0,P-),3\n"
-  }
-
+    
+    val result: Seq[(Int, Long, Int, String, Long)] = reduceDs.collect().sortBy( t=> (t._1,
t._5))
+
+    val expected = Seq[(Int, Long, Int, String, Long)](
+      (1,1,0,"P-)",1),
+      (2,3,0,"P-)",1),
+      (2,2,0,"P-)",2),
+      (3,9,0,"P-)",2),
+      (3,6,0,"P-)",3),
+      (4,17,0,"P-)",1),
+      (4,17,0,"P-)",2),
+      (5,11,0,"P-)",1),
+      (5,29,0,"P-)",2),
+      (5,25,0,"P-)",3) )
+    
+    assertEquals(expected, result)
+  }
+
+  /**
+   * check correctness of groupReduce on tuples with key field selector and group sorting
+   */
   @Test
   def testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting(): Unit = {
-    /*
-     * check correctness of groupReduce on tuples with key field selector and group sorting
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(1)
     val ds =  CollectionDataSets.get3TupleDataSet(env)
@@ -112,37 +105,45 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
       in =>
         in.reduce((l, r) => (l._1 + r._1, l._2, l._3 + "-" + r._3))
     }
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" +
-      "5,2,Hello-Hello world\n" +
-      "15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n" +
-      "34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
-      "65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
-      "111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n"
-  }
-
+    
+    val result: Seq[(Int, Long, String)] = reduceDs.collect().sortBy(_._1)
+    
+    val expected = Seq[(Int, Long, String)] (
+      (1,1,"Hi"),
+      (5,2,"Hello-Hello world"),
+      (15,3,"Hello world, how are you?-I am fine.-Luke Skywalker"),
+      (34,4,"Comment#1-Comment#2-Comment#3-Comment#4"),
+      (65,5,"Comment#5-Comment#6-Comment#7-Comment#8-Comment#9"),
+      (111,6,"Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15") )
+    
+    assertEquals(expected, result)
+  }
+
+  /**
+   * check correctness of groupReduce on tuples with key extractor
+   */
   @Test
   def testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor(): Unit = {
-    /*
-     * check correctness of groupReduce on tuples with key extractor
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds =  CollectionDataSets.get3TupleDataSet(env)
     val reduceDs =  ds.groupBy(_._2).reduceGroup {
       in =>
         in.map(t => (t._1, t._2)).reduce((l, r) => (l._1 + r._1, l._2))
     }
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1\n" + "5,2\n" + "15,3\n" + "34,4\n" + "65,5\n" + "111,6\n"
+    
+    val result: Seq[(Int, Long)] = reduceDs.collect().sortBy(_._1)
+    
+    val expected = Seq[(Int, Long)]( (1,1), (5,2), (15,3), (34,4), (65,5), (111,6) )
+    
+    assertEquals(expected, result)
   }
 
+  /**
+   * check correctness of groupReduce on custom type with type extractor
+   */
   @Test
   def testCorrectnessOfGroupReduceOnCustomTypeWithTypeExtractor(): Unit = {
-    /*
-     * check correctness of groupReduce on custom type with type extractor
-     */
+
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds =  CollectionDataSets.getCustomTypeDataSet(env)
     val reduceDs =  ds.groupBy(_.myInt).reduceGroup {
@@ -161,17 +162,20 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
         }
         o
     }
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,0,Hello!\n" + "2,3,Hello!\n" + "3,12,Hello!\n" + "4,30,Hello!\n" + "5,60,"
+
-      "Hello!\n" + "6,105,Hello!\n"
+    
+    val result: Seq[String] = reduceDs.map(_.toString).collect().sorted
+
+    val expected = Seq[String]( "1,0,Hello!", "2,3,Hello!", "3,12,Hello!",
+      "4,30,Hello!", "5,60,Hello!", "6,105,Hello!")
+    
+    assertEquals(expected, result)
   }
 
+  /**
+   * check correctness of all-groupreduce for tuples
+   */
   @Test
   def testCorrectnessOfAllGroupReduceForTuples(): Unit = {
-    /*
-     * check correctness of all-groupreduce for tuples
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds =  CollectionDataSets.get3TupleDataSet(env)
     val reduceDs =  ds.reduceGroup {
@@ -184,16 +188,17 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
         }
         (i, l, "Hello World")
     }
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "231,91,Hello World\n"
+    
+    val result: (Int, Long, String) = reduceDs.collect().head
+    val expected: (Int, Long, String) = (231,91,"Hello World")
+    assertEquals(expected, result)
   }
 
+  /**
+   * check correctness of all-groupreduce for custom types
+   */
   @Test
   def testCorrectnessOfAllGroupReduceForCustomTypes(): Unit = {
-    /*
-     * check correctness of all-groupreduce for custom types
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds =  CollectionDataSets.getCustomTypeDataSet(env)
     val reduceDs =  ds.reduceGroup {
@@ -205,16 +210,17 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
         }
         o
     }
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "91,210,Hello!"
+    val result : String = reduceDs.collect().head.toString()
+    val expected = "91,210,Hello!"
+    assertEquals(expected, result)
   }
 
+  /**
+   * check correctness of groupReduce with broadcast set
+   */
   @Test
   def testCorrectnessOfGroupReduceWithBroadcastSet(): Unit = {
-    /*
-     * check correctness of groupReduce with broadcast set
-     */
+
     val env = ExecutionEnvironment.getExecutionEnvironment
     val intDs =  CollectionDataSets.getIntDataSet(env)
     val ds =  CollectionDataSets.get3TupleDataSet(env)
@@ -239,20 +245,30 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
           out.collect((i, l, f2Replace))
         }
       }).withBroadcastSet(intDs, "ints")
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,55\n" + "5,2,55\n" + "15,3,55\n" + "34,4,55\n" + "65,5,55\n" + "111,6,55\n"
-  }
-
+    
+    
+    val result: Seq[(Int, Long, String)] = reduceDs.collect().sortBy(_._1)
+    val expected = Seq[(Int, Long, String)](
+      (1,1,"55"),
+      (5,2,"55"),
+      (15,3,"55"),
+      (34,4,"55"), 
+      (65,5,"55"), 
+      (111,6,"55") )
+    
+    assertEquals(expected, result)
+  }
+
+  /**
+   * check correctness of groupReduce if UDF returns input objects multiple times and
+   * changes it in between
+   */
   @Test
   def testCorrectnessOfGroupReduceIfUDFReturnsInputObjectMultipleTimesWhileChangingIt():
Unit = {
-    /*
-     * check correctness of groupReduce if UDF returns input objects multiple times and
-     * changes it in between
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds =  CollectionDataSets.get3TupleDataSet(env)
       .map( t => MutableTuple3(t._1, t._2, t._3) )
+    
     val reduceDs =  ds.groupBy(1).reduceGroup {
       (in, out: Collector[MutableTuple3[Int, Long, String]]) =>
         for (t <- in) {
@@ -266,59 +282,75 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
           }
         }
     }
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "11,1,Hi!\n" + "21,1,Hi again!\n" + "12,2,Hi!\n" + "22,2,Hi again!\n" + "13,2,"
+
-      "Hi!\n" + "23,2,Hi again!\n"
-  }
-
+    val result: Seq[String] = reduceDs.collect().map(x => s"${x._1},${x._2},${x._3}").sorted
+    
+    val expected = Seq[String](
+      "11,1,Hi!", "21,1,Hi again!",
+      "12,2,Hi!", "22,2,Hi again!",
+      "13,2,Hi!", "23,2,Hi again!").sorted
+
+    assertEquals(expected, result)
+  }
+  
+  /**
+   * check correctness of groupReduce on custom type with key extractor and combine
+   */
   @Test
   def testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine(): Unit = {
-    /*
-     * check correctness of groupReduce on custom type with key extractor and combine
-     */
-    org.junit.Assume.assumeThat(mode, new IsNot(new IsEqual(TestExecutionMode.COLLECTION)))
+
+    org.junit.Assume.assumeFalse(mode == TestExecutionMode.COLLECTION)
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds =  CollectionDataSets.getCustomTypeDataSet(env)
 
-    val reduceDs =  ds.groupBy(_.myInt).reduceGroup(new CustomTypeGroupReduceWithCombine)
-
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected =
-      "1,0,test1\n" + "2,3,test2\n" + "3,12,test3\n" + "4,30,test4\n" + "5,60," +
-        "test5\n" + "6,105,test6\n"
+    val reduceDs =  ds.groupBy(_.myInt).reduceGroup(new CustomTypeGroupReduceWithCombine())
+    
+    val result: Seq[String] = reduceDs.collect().map(_.toString()).sorted
+    
+    val expected = Seq[String](
+      "1,0,test1",
+      "2,3,test2",
+      "3,12,test3",
+      "4,30,test4",
+      "5,60,test5",
+      "6,105,test6")
 
+    assertEquals(expected, result)
   }
 
+  /**
+   * check correctness of groupReduce on tuples with combine
+   */
   @Test
   def testCorrectnessOfGroupReduceOnTuplesWithCombine(): Unit = {
-    /*
-     * check correctness of groupReduce on tuples with combine
-     */
-    org.junit.Assume.assumeThat(mode, new IsNot(new IsEqual(TestExecutionMode.COLLECTION)))
+
+    org.junit.Assume.assumeFalse(mode == TestExecutionMode.COLLECTION)
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     // important because it determines how often the combiner is called
     env.setParallelism(2)
     val ds =  CollectionDataSets.get3TupleDataSet(env)
 
-    val reduceDs =  ds.groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine)
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected =
-      "1,test1\n" + "5,test2\n" + "15,test3\n" + "34,test4\n" + "65,test5\n" + "111," +
-        "test6\n"
-
-  }
-
+    val reduceDs =  ds.groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine())
+    
+    val result: Seq[(Int, String)] = reduceDs.collect().sortBy(_._1)
+    val expected = Seq[(Int, String)](
+      (1,"test1"),
+      (5,"test2"),
+      (15,"test3"),
+      (34,"test4"),
+      (65,"test5"),
+      (111,"test6") )
+    assertEquals(expected, result)
+  }
+
+  /**
+   * check correctness of all-groupreduce for tuples with combine
+   */
   @Test
   def testCorrectnessOfAllGroupReduceForTuplesWithCombine(): Unit = {
-    /*
-     * check correctness of all-groupreduce for tuples with combine
-     */
-    org.junit.Assume.assumeThat(mode, new IsNot(new IsEqual(TestExecutionMode.COLLECTION)))
+
+    org.junit.Assume.assumeFalse(mode == TestExecutionMode.COLLECTION)
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds =  CollectionDataSets.get3TupleDataSet(env).map(t => t).setParallelism(4)
@@ -328,42 +360,45 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
 
     val reduceDs =  ds.reduceGroup(new Tuple3AllGroupReduceWithCombine).withParameters(cfg)
 
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "322," +
-        "testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n"
-
+    val result: (Int, String) = reduceDs.collect().head
+    val expected = (322,
+        "testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest")
+    assertEquals(expected, result)
   }
 
+  /**
+   * check correctness of groupReduce with descending group sort
+   */
   @Test
   def testCorrectnessOfGroupReduceWithDescendingGroupSort(): Unit = {
-    /*
-     * check correctness of groupReduce with descending group sort
-     */
+
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.get3TupleDataSet(env)
     val reduceDs =  ds.groupBy(1).sortGroup(2, Order.DESCENDING).reduceGroup {
       in =>
         in.reduce((l, r) => (l._1 + r._1, l._2, l._3 + "-" + r._3))
     }
 
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "5,2,Hello world-Hello\n" + "15,3,Luke Skywalker-I am fine.-Hello
" +
-      "world, how are you?\n" + "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" + "65,5,"
+
-      "Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" + "111,6," +
-      "Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n"
+    val result: Seq[(Int, Long, String)] = reduceDs.collect().sortBy(_._1)
+    val expected = Seq[(Int, Long, String)](
+      (1,1,"Hi"),
+      (5,2,"Hello world-Hello"),
+      (15,3,"Luke Skywalker-I am fine.-Hello world, how are you?"),
+      (34,4,"Comment#4-Comment#3-Comment#2-Comment#1"),
+      (65,5,"Comment#9-Comment#8-Comment#7-Comment#6-Comment#5"),
+      (111,6,"Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10") )
+
+    assertEquals(expected, result)
   }
 
+  /**
+   * check correctness of groupReduce on tuples with tuple-returning key selector
+   */
   @Test
   def testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector(): Unit = {
-    /*
-     * check correctness of groupReduce on tuples with tuple-returning key selector
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds =  CollectionDataSets
-      .get5TupleDataSet(env)
+    val ds =  CollectionDataSets.get5TupleDataSet(env)
+    
     val reduceDs = ds.groupBy( t => (t._1, t._5)).reduceGroup {
       in =>
         val (i, l, l2) = in
@@ -372,42 +407,59 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
         (i, l, 0, "P-)", l2)
     }
 
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,0,P-),1\n" + "2,3,0,P-),1\n" + "2,2,0,P-),2\n" + "3,9,0,P-),2\n" + "3,6,0,"
+
-      "P-),3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,0,P-),1\n" + "5,29,0,P-),"
+
-      "2\n" + "5,25,0,P-),3\n"
-  }
-
+    val result: Seq[(Int, Long, Int, String, Long)] = reduceDs.collect().sortBy(x => (x._1,
x._5))
+    
+    val expected = Seq[(Int, Long, Int, String, Long)](
+      (1,1,0,"P-)",1),
+      (2,3,0,"P-)",1),
+      (2,2,0,"P-)",2),
+      (3,9,0,"P-)",2),
+      (3,6,0,"P-)",3),
+      (4,17,0,"P-)",1),
+      (4,17,0,"P-)",2),
+      (5,11,0,"P-)",1),
+      (5,29,0,"P-)",2),
+      (5,25,0,"P-)",3) )
+
+    assertEquals(expected, result)
+  }
+
+  /**
+   * check that input of combiner is also sorted for combinable groupReduce with group
+   * sorting
+   */
   @Test
   def testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting(): Unit = {
-    /*
-     * check that input of combiner is also sorted for combinable groupReduce with group
-     * sorting
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    val ds =  CollectionDataSets.get3TupleDataSet(env).map { t =>
+    val ds =  CollectionDataSets.get3TupleDataSet(env).map { t => 
       MutableTuple3(t._1, t._2, t._3)
     }
 
-    val reduceDs =  ds.groupBy(1)
-      .sortGroup(0, Order.ASCENDING).reduceGroup(new OrderCheckingCombinableReduce)
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "7,4," +
-      "Comment#1\n" + "11,5,Comment#5\n" + "16,6,Comment#10\n"
-  }
-
+    val reduceDs =  ds.groupBy(1).sortGroup(0, Order.ASCENDING)
+      .reduceGroup(new OrderCheckingCombinableReduce())
+    
+    
+    val result: Seq[String] = reduceDs.collect().sortBy(_._1).map(x => s"${x._1},${x._2},${x._3}")
+    
+    val expected = Seq[String] (
+      "1,1,Hi",
+      "2,2,Hello",
+      "4,3,Hello world, how are you?",
+      "7,4,Comment#1",
+      "11,5,Comment#5",
+      "16,6,Comment#10")
+    assertEquals(expected, result)
+  }
+
+  /**
+   * Deep nesting test
+   * + null value in pojo
+   */
   @Test
   def testDeepNestingAndNullValueInPojo(): Unit = {
-    /*
-     * Deep nesting test
-     * + null value in pojo
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds =  CollectionDataSets
-      .getCrazyNestedDataSet(env)
+    val ds =  CollectionDataSets.getCrazyNestedDataSet(env)
+    
     val reduceDs =  ds.groupBy("nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal")
       .reduceGroup {
       in =>
@@ -419,211 +471,236 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
         }
         (n, c)
     }
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "aa,1\nbb,2\ncc,3\n"
+    
+    val result: Seq[(String, Int)] = reduceDs.collect().sortBy(_._1)
+    val expected = Seq[(String, Int)](
+      ("aa",1),
+      ("bb",2),
+      ("cc",3) )
+    assertEquals(expected, result)
   }
 
+  /**
+   * Test Pojo containing a Writable and Tuples
+   */
   @Test
   def testPojoContainigAWritableAndTuples(): Unit = {
-    /*
-     * Test Pojo containing a Writable and Tuples
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds =  CollectionDataSets
-      .getPojoContainingTupleAndWritable(env)
-    val reduceDs =  ds.groupBy("hadoopFan", "theTuple.*").reduceGroup(new
-        GroupReduceFunction[CollectionDataSets.PojoContainingTupleAndWritable, Integer] {
-      def reduce(
-                  values: Iterable[CollectionDataSets.PojoContainingTupleAndWritable],
-                  out: Collector[Integer]) {
+    val ds =  CollectionDataSets.getPojoContainingTupleAndWritable(env)
+    
+    val reduceDs = ds.groupBy("hadoopFan", "theTuple.*").reduceGroup {
+      (values, out: Collector[Int]) => {
         var c: Int = 0
-        for (v <- values.asScala) {
+        for (v <- values) {
           c += 1
         }
         out.collect(c)
       }
-    })
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1\n5\n"
+    }
+    
+    val result: Seq[Int] = reduceDs.collect().sorted
+    val expected = Seq[Int](1, 5)
+    assertEquals(expected, result)
   }
 
+  /**
+   * Test Tuple containing pojos and regular fields
+   */
   @Test
   def testTupleContainingPojosAndRegularFields(): Unit ={
-    /*
-     * Test Tuple containing pojos and regular fields
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.getTupleContainingPojos(env)
-    val reduceDs =  ds.groupBy("_1", "_2.*").reduceGroup(
-      new GroupReduceFunction[(Int, CrazyNested, POJO), Int] {
-        def reduce(values: Iterable[(Int, CrazyNested, POJO)], out: Collector[Int]) {
-          var c: Int = 0
-          for (v <- values.asScala) {
-            c += 1
-          }
-          out.collect(c)
-        }
-      })
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "3\n1\n"
+    
+    val reduceDs =  ds.groupBy("_1", "_2.*").reduceGroup {
+      (values, out: Collector[Int]) => {
+        out.collect(values.size)
+      }
+    }
+    
+    val result: Seq[Int] = reduceDs.collect().sorted
+    val expected = Seq[Int](1, 3)
+    assertEquals(expected, result)
   }
 
+  /**
+   * Test string-based definition on group sort, based on test:
+   * check correctness of groupReduce with descending group sort
+   */
   @Test
   def testStringBasedDefinitionOnGroupSort(): Unit = {
-    /*
-     * Test string-based definition on group sort, based on test:
-     * check correctness of groupReduce with descending group sort
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.get3TupleDataSet(env)
+    
     val reduceDs =  ds.groupBy(1)
       .sortGroup("_3", Order.DESCENDING)
       .reduceGroup {
       in =>
         in.reduce((l, r) => (l._1 + r._1, l._2, l._3 + "-" + r._3))
     }
-    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "5,2,Hello world-Hello\n" + "15,3,Luke Skywalker-I am fine.-Hello
" +
-      "world, how are you?\n" + "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" + "65,5,"
+
-      "Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" + "111,6," +
-      "Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n"
-  }
-
+    
+    val result: Seq[(Int, Long, String)] = reduceDs.collect().sortBy(_._1)
+    val expected = Seq[(Int, Long, String)](
+      (1,1,"Hi"),
+      (5,2,"Hello world-Hello"),
+      (15,3,"Luke Skywalker-I am fine.-Hello world, how are you?"),
+      (34,4,"Comment#4-Comment#3-Comment#2-Comment#1"),
+      (65,5,"Comment#9-Comment#8-Comment#7-Comment#6-Comment#5"),
+      (111,6,"Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10") )
+    assertEquals(expected, result)
+  }
+
+  /**
+   * Test int-based definition on group sort, for (full) nested Tuple
+   */
   @Test
   def testIntBasedDefinitionOnGroupSortForFullNestedTuple(): Unit = {
-    /*
-     * Test int-based definition on group sort, for (full) nested Tuple
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.getGroupSortedNestedTupleDataSet(env)
+    
     val reduceDs =  ds.groupBy("_2").sortGroup(0, Order.DESCENDING)
-      .reduceGroup(new NestedTupleReducer)
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "a--(2,1)-(1,3)-(1,2)-\n" + "b--(2,2)-\n" + "c--(4,9)-(3,6)-(3,3)-\n"
-  }
-
+      .reduceGroup(new NestedTupleReducer())
+    
+    val result: Seq[String] = reduceDs.map(_.toString()).collect().sorted
+    val expected = Seq[String](
+      "a--(2,1)-(1,3)-(1,2)-",
+      "b--(2,2)-",
+      "c--(4,9)-(3,6)-(3,3)-")
+    assertEquals(expected, result)
+  }
+
+  /**
+   * Test int-based definition on group sort, for (partial) nested Tuple ASC
+   */
   @Test
   def testIntBasedDefinitionOnGroupSortForPartialNestedTuple(): Unit = {
-    /*
-     * Test int-based definition on group sort, for (partial) nested Tuple ASC
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.getGroupSortedNestedTupleDataSet(env)
+    
     val reduceDs =  ds.groupBy("_2")
       .sortGroup("_1._1", Order.ASCENDING)
       .sortGroup("_1._2", Order.ASCENDING)
       .reduceGroup(new NestedTupleReducer)
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "a--(1,2)-(1,3)-(2,1)-\n" + "b--(2,2)-\n" + "c--(3,3)-(3,6)-(4,9)-\n"
+    
+    val result: Seq[String] = reduceDs.map(_.toString).collect().sorted
+    val expected = Seq[String](
+      "a--(1,2)-(1,3)-(2,1)-",
+      "b--(2,2)-",
+      "c--(3,3)-(3,6)-(4,9)-")
+    assertEquals(expected, result)
   }
 
+  /**
+   * Test string-based definition on group sort, for (partial) nested Tuple DESC
+   */
   @Test
   def testStringBasedDefinitionOnGroupSortForPartialNestedTuple(): Unit = {
-    /*
-     * Test string-based definition on group sort, for (partial) nested Tuple DESC
-     */
+    
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.getGroupSortedNestedTupleDataSet(env)
+    
     val reduceDs =  ds.groupBy("_2")
       .sortGroup("_1._1", Order.DESCENDING)
       .sortGroup("_1._2", Order.ASCENDING)
       .reduceGroup(new NestedTupleReducer)
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "a--(2,1)-(1,2)-(1,3)-\n" + "b--(2,2)-\n" + "c--(4,9)-(3,3)-(3,6)-\n"
+    
+    val result: Seq[String] = reduceDs.map(_.toString()).collect().sorted
+    val expected = Seq[String](
+      "a--(2,1)-(1,2)-(1,3)-",
+      "b--(2,2)-",
+      "c--(4,9)-(3,3)-(3,6)-")
+    assertEquals(expected, result)
   }
 
+  /**
+   * Test string-based definition on group sort, for two grouping keys
+   */
   @Test
   def testStringBasedDefinitionOnGroupSortForTwoGroupingKeys(): Unit = {
-    /*
-     * Test string-based definition on group sort, for two grouping keys
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.getGroupSortedNestedTupleDataSet(env)
+    
     val reduceDs =  ds.groupBy("_2")
       .sortGroup("_1._1", Order.DESCENDING)
       .sortGroup("_1._2", Order.DESCENDING)
-      .reduceGroup(new NestedTupleReducer)
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "a--(2,1)-(1,3)-(1,2)-\n" + "b--(2,2)-\n" + "c--(4,9)-(3,6)-(3,3)-\n"
-  }
-
+      .reduceGroup(new NestedTupleReducer())
+    
+    val result: Seq[String] = reduceDs.map(_.toString()).collect().sorted
+    val expected = Seq[String] (
+      "a--(2,1)-(1,3)-(1,2)-",
+      "b--(2,2)-",
+      "c--(4,9)-(3,6)-(3,3)-")
+    assertEquals(expected, result)
+  }
+
+  /**
+   * Test string-based definition on group sort, for two grouping keys with Pojos
+   */
   @Test
   def testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos(): Unit = {
-    /*
-     * Test string-based definition on group sort, for two grouping keys with Pojos
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(1)
     val ds =  CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env)
     val reduceDs =  ds.groupBy("hadoopFan")
       .sortGroup("theTuple._1", Order.DESCENDING)
       .sortGroup("theTuple._2", Order.DESCENDING)
-      .reduceGroup(
-        new GroupReduceFunction[CollectionDataSets.PojoContainingTupleAndWritable, String]
{
-          def reduce(
-                      values: Iterable[CollectionDataSets.PojoContainingTupleAndWritable],
-                      out: Collector[String]) {
-            var once: Boolean = false
-            val concat: StringBuilder = new StringBuilder
-            for (value <- values.asScala) {
-              if (!once) {
-                concat.append(value.hadoopFan.get)
-                concat.append("---")
-                once = true
-              }
-              concat.append(value.theTuple)
-              concat.append("-")
+      .reduceGroup {
+        (values, out: Collector[String]) => {
+          var once: Boolean = false
+          val concat: StringBuilder = new StringBuilder
+          for (value <- values) {
+            if (!once) {
+              concat.append(value.hadoopFan.get)
+              concat.append("---")
+              once = true
             }
-            out.collect(concat.toString())
+            concat.append(value.theTuple)
+            concat.append("-")
           }
-        })
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1---(10,100)-\n" + "2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n"
+          out.collect(concat.toString())
+        }
+      }
+    
+    val result: Seq[String] = reduceDs.map(_.toString()).collect().sorted
+    val expected = Seq[String](
+      "1---(10,100)-",
+      "2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-")
+    assertEquals(expected, result)
   }
 
+  /**
+   * check correctness of sorted groupReduce on tuples with keyselector sorting
+   */
   @Test
-  def testTupleKeySelectorGroupSort: Unit = {
-    /*
-     * check correctness of sorted groupReduce on tuples with keyselector sorting
-     */
+  def testTupleKeySelectorGroupSort(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.get3TupleDataSet(env)
+    
     val reduceDs =  ds.groupBy(_._2).sortGroup(_._3, Order.DESCENDING).reduceGroup {
       in =>
         in.reduce((l, r) => (l._1 + r._1, l._2, l._3 + "-" + r._3))
     }
-    reduceDs.writeAsCsv(resultPath)
-    env.execute()
-    expected = "1,1,Hi\n" +
-      "5,2,Hello world-Hello\n" +
-      "15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
-      "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
-      "65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
-      "111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n"
-  }
-
-  @Test
-  def testPojoKeySelectorGroupSort: Unit = {
-    /*
+    
+    val result: Seq[(Int, Long, String)] = reduceDs.collect().sortBy(_._1)
+    
+    val expected = Seq[(Int, Long, String)](
+      (1,1,"Hi"),
+      (5,2,"Hello world-Hello"),
+      (15,3,"Luke Skywalker-I am fine.-Hello world, how are you?"),
+      (34,4,"Comment#4-Comment#3-Comment#2-Comment#1"),
+      (65,5,"Comment#9-Comment#8-Comment#7-Comment#6-Comment#5"),
+      (111,6,"Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10") )
+    
+    assertEquals(expected, result)
+  }
+
+  /**
    * check correctness of sorted groupReduce on custom type with keyselector sorting
    */
+  @Test
+  def testPojoKeySelectorGroupSort(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds =  CollectionDataSets.getCustomTypeDataSet(env)
+    
     val reduceDs =  ds.groupBy(_.myInt).sortGroup(_.myString, Order.DESCENDING).reduceGroup
{
       in =>
         val iter = in.toIterator
@@ -642,48 +719,51 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
         o.myString = concat.toString()
         o
     }
-    reduceDs.writeAsText(resultPath)
-    env.execute()
-    expected = "1,0,Hi\n" +
-      "2,3,Hello world-Hello\n" +
-      "3,12,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
-      "4,30,Comment#4-Comment#3-Comment#2-Comment#1\n" +
-      "5,60,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
-      "6,105,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n"
-  }
-
+    
+    val result: Seq[String] = reduceDs.map(_.toString()).collect().sorted
+    
+    val expected = Seq[String]( "1,0,Hi",
+      "2,3,Hello world-Hello",
+      "3,12,Luke Skywalker-I am fine.-Hello world, how are you?",
+      "4,30,Comment#4-Comment#3-Comment#2-Comment#1",
+      "5,60,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5",
+      "6,105,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10" )
+    
+    assertEquals(expected, result)
+  }
+
+  /**
+   * check correctness of sorted groupReduce with combine on tuples with keyselector sorting
+   */
   @Test
-  def testTupleKeySelectorSortWithCombine: Unit = {
-    /*
-     * check correctness of sorted groupReduce with combine on tuples with keyselector sorting
-     */
+  def testTupleKeySelectorSortWithCombine(): Unit = {
+    
+    assumeTrue(mode != TestExecutionMode.COLLECTION)
+    
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.get3TupleDataSet(env)
 
     val reduceDs =  ds.groupBy(_._2).sortGroup(_._3, Order.DESCENDING)
-      .reduceGroup(new Tuple3SortedGroupReduceWithCombine)
-    reduceDs.writeAsCsv(resultPath)
-    env.execute()
-    if (mode == TestExecutionMode.COLLECTION) {
-      expected = null
-    } else {
-      expected = "1,Hi\n" +
-        "5,Hello world-Hello\n" +
-        "15,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
-        "34,Comment#4-Comment#3-Comment#2-Comment#1\n" +
-        "65,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
-        "111,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n"
-    }
-  }
-
+      .reduceGroup(new Tuple3SortedGroupReduceWithCombine())
+    
+    val result : Seq[(Int, String)] = reduceDs.collect().sortBy(_._1)
+    
+    val expected = Seq[(Int, String)]( (1,"Hi"),
+        (5,"Hello world-Hello"),
+        (15,"Luke Skywalker-I am fine.-Hello world, how are you?"),
+        (34,"Comment#4-Comment#3-Comment#2-Comment#1"),
+        (65,"Comment#9-Comment#8-Comment#7-Comment#6-Comment#5"),
+        (111,"Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10") )
+    
+    assertEquals(expected, result)
+  }
+
+  /**
+   * check correctness of sorted groupReduceon with Tuple2 keyselector sorting
+   */
   @Test
-  def testTupleKeySelectorSortCombineOnTuple: Unit = {
-    /*
-     * check correctness of sorted groupReduceon with Tuple2 keyselector sorting
-     */
+  def testTupleKeySelectorSortCombineOnTuple(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.get5TupleDataSet(env)
 
     val reduceDs = ds.groupBy(_._1).sortGroup(t => (t._5, t._3), Order.DESCENDING).reduceGroup{
@@ -706,58 +786,57 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
         (key, sum, 0, concat.toString(), s)
       //            in.reduce((l, r) => (l._1, l._2 + r._2, 0, l._4 + "-" + r._4, l._5))
     }
-    reduceDs.writeAsCsv(resultPath)
-    env.execute()
-    expected = "1,1,0,Hallo,1\n" +
-      "2,5,0,Hallo Welt-Hallo Welt wie,1\n" +
-      "3,15,0,BCD-ABC-Hallo Welt wie gehts?,2\n" +
-      "4,34,0,FGH-CDE-EFG-DEF,1\n" +
-      "5,65,0,IJK-HIJ-KLM-JKL-GHI,1\n"
-  }
-
-
+    
+    val result: Seq[(Int, Long, Int, String, Long)] = reduceDs.collect().sortBy(_._1)
+    val expected = Seq[(Int, Long, Int, String, Long)](
+      (1,1,0,"Hallo",1),
+      (2,5,0,"Hallo Welt-Hallo Welt wie",1),
+      (3,15,0,"BCD-ABC-Hallo Welt wie gehts?",2),
+      (4,34,0,"FGH-CDE-EFG-DEF",1),
+      (5,65,0,"IJK-HIJ-KLM-JKL-GHI",1) )
+    assertEquals(expected, result)
+  }
+
+  /**
+   * Test grouping with pojo containing multiple pojos (was a bug)
+   */
   @Test
-  def testGroupingWithPojoContainingMultiplePojos: Unit = {
-    /*
-     * Test grouping with pojo containing multiple pojos (was a bug)
-     */
+  def testGroupingWithPojoContainingMultiplePojos(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
     val ds =  CollectionDataSets.getPojoWithMultiplePojos(env)
-    val reduceDs =  ds.groupBy("p2.a2")
-      .reduceGroup {
-      new GroupReduceFunction[CollectionDataSets.PojoWithMultiplePojos, String] {
-        def reduce(
-                    values: Iterable[CollectionDataSets.PojoWithMultiplePojos],
-                    out: Collector[String]) {
-          val concat: StringBuilder = new StringBuilder
-          for (value <- values.asScala) {
-            concat.append(value.p2.a2)
-          }
-          out.collect(concat.toString())
+    
+    val reduceDs =  ds.groupBy("p2.a2").reduceGroup {
+      (values, out: Collector[String]) => {
+        val concat: StringBuilder = new StringBuilder()
+        for (value <- values) {
+          concat.append(value.p2.a2)
         }
+        out.collect(concat.toString())
       }
     }
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "b\nccc\nee\n"
+    
+    val result : Seq[String] = reduceDs.map(_.toString()).collect().sorted
+    val expected = Seq[String]("b", "ccc", "ee")
+    assertEquals(expected, result)
   }
 
   @Test
-  def testWithAtomic1: Unit = {
+  def testWithAtomic1(): Unit = {
+    
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(0, 1, 1, 2)
     val reduceDs = ds.groupBy("*").reduceGroup((ints: Iterator[Int]) => ints.next())
-    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "0\n1\n2"
+    
+    val result: Seq[Int] = reduceDs.collect().sorted
+    val expected = Seq[Int](0, 1, 2)
+    assertEquals(expected, result)
   }
 }
 
 @RichGroupReduceFunction.Combinable
-class OrderCheckingCombinableReduce
-  extends RichGroupReduceFunction[MutableTuple3[Int, Long, String],
-    MutableTuple3[Int, Long, String]] {
+class OrderCheckingCombinableReduce extends
+RichGroupReduceFunction[MutableTuple3[Int, Long, String], MutableTuple3[Int, Long, String]]
{
+  
   def reduce(
               values: Iterable[MutableTuple3[Int, Long, String]],
               out: Collector[MutableTuple3[Int, Long, String]]) {


Mime
View raw message