flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [70/82] [abbrv] incubator-flink git commit: Change integration tests to reuse cluster in order to save startup and shutdown time.
Date Thu, 18 Dec 2014 18:46:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
index 9cf5086..4732b58 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
@@ -21,219 +21,213 @@ import org.apache.flink.api.common.functions.RichCrossFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
+@RunWith(classOf[Parameterized])
+class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
 
-object CrossProgs {
-  var NUM_PROGRAMS: Int = 9
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * check correctness of cross on two tuple inputs
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
-        val crossDs = ds.cross(ds2) { (l, r) => (l._3 + r._3, l._4 + r._4) }
-        crossDs.writeAsCsv(resultPath)
-        env.execute()
-
-        "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1," +
-          "Hallo WeltHallo\n" + "2,Hallo WeltHallo Welt\n" + "3,Hallo WeltHallo Welt wie\n" + "2," +
-          "Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4," +
-          "Hallo Welt wieHallo Welt wie\n"
-
-      case 2 =>
-        /*
-         * check correctness of cross if UDF returns left input object
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
-        val crossDs = ds.cross(ds2) { (l, r ) => l }
-        crossDs.writeAsCsv(resultPath)
-        env.execute()
-
-        "1,1,Hi\n" + "1,1,Hi\n" + "1,1,Hi\n" + "2,2,Hello\n" + "2,2,Hello\n" + "2,2," +
-          "Hello\n" + "3,2,Hello world\n" + "3,2,Hello world\n" + "3,2,Hello world\n"
-
-      case 3 =>
-        /*
-         * check correctness of cross if UDF returns right input object
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
-        val crossDs = ds.cross(ds2) { (l, r) => r }
-        crossDs.writeAsCsv(resultPath)
-        env.execute()
-
-        "1,1,0,Hallo,1\n" + "1,1,0,Hallo,1\n" + "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt," +
-          "2\n" + "2,2,1,Hallo Welt,2\n" + "2,2,1,Hallo Welt,2\n" + "2,3,2,Hallo Welt wie," +
-          "1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,3,2,Hallo Welt wie,1\n"
-
-      case 4 =>
-        /*
-         * check correctness of cross with broadcast set
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val intDs = CollectionDataSets.getIntDataSet(env)
-        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
-        val crossDs = ds.cross(ds2).apply (
-          new RichCrossFunction[
-          (Int, Long, Int, String, Long),
-          (Int, Long, Int, String, Long),
-          (Int, Int, Int)] {
-          private var broadcast = 41
-
-          override def open(config: Configuration) {
-            val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
-            broadcast = ints.sum
-          }
-
-          override def cross(
-              first: (Int, Long, Int, String, Long),
-              second: (Int, Long, Int, String, Long)): (Int, Int, Int) = {
-            (first._1 + second._1, first._3.toInt * second._3.toInt, broadcast)
-          }
-
-        })withBroadcastSet(intDs, "ints")
-        crossDs.writeAsCsv(resultPath)
-        env.execute()
-        "2,0,55\n" + "3,0,55\n" + "3,0,55\n" + "3,0,55\n" + "4,1,55\n" + "4,2,55\n" + "3," +
-          "0,55\n" + "4,2,55\n" + "4,4,55\n"
-
-      case 5 =>
-        /*
-         * check correctness of crossWithHuge (only correctness of result -> should be the same
-         * as with normal cross)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
-        val crossDs = ds.crossWithHuge(ds2) { (l, r) => (l._3 + r._3, l._4 + r._4)}
-        crossDs.writeAsCsv(resultPath)
-        env.execute()
-        "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1," +
-          "Hallo WeltHallo\n" + "2,Hallo WeltHallo Welt\n" + "3,Hallo WeltHallo Welt wie\n" + "2," +
-          "Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4," +
-          "Hallo Welt wieHallo Welt wie\n"
-
-      case 6 =>
-        /*
-         * check correctness of crossWithTiny (only correctness of result -> should be the same
-         * as with normal cross)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets
-          .getSmall5TupleDataSet(env)
-        val ds2 = CollectionDataSets
-          .getSmall5TupleDataSet(env)
-        val crossDs = ds.crossWithTiny(ds2) { (l, r) => (l._3 + r._3, l._4 + r._4)}
-        crossDs.writeAsCsv(resultPath)
-        env.execute()
-        "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1," +
-          "Hallo WeltHallo\n" + "2,Hallo WeltHallo Welt\n" + "3,Hallo WeltHallo Welt wie\n" + "2," +
-          "Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4," +
-          "Hallo Welt wieHallo Welt wie\n"
-
-      case 7 => // 9 in Java CrossITCase
-        /*
-         * check correctness of default cross
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
-        val crossDs = ds.cross(ds2)
-        crossDs.writeAsCsv(resultPath)
-        env.execute()
-        "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" + "(1,1,Hi),(1,1,0,Hallo,1)\n" + "(1,1,Hi),(2,3," +
-          "2,Hallo Welt wie,1)\n" + "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" + "(2,2,Hello),(1,1,0," +
-          "Hallo,1)\n" + "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" + "(3,2,Hello world),(2,2,1," +
-          "Hallo Welt,2)\n" + "(3,2,Hello world),(1,1,0,Hallo,1)\n" + "(3,2,Hello world),(2,3,2," +
-          "Hallo Welt wie,1)\n"
-
-      case 8 => // 10 in Java CrossITCase
-        /*
-         * check correctness of cross on two custom type inputs
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmallCustomTypeDataSet(env)
-        val ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env)
-        val crossDs = ds.cross(ds2) {
-          (l, r) => new CustomType(l.myInt * r.myInt, l.myLong + r.myLong, l.myString + r.myString)
-        }
-        crossDs.writeAsText(resultPath)
-        env.execute()
-        "1,0,HiHi\n" + "2,1,HiHello\n" + "2,2,HiHello world\n" + "2,1,HelloHi\n" + "4,2," +
-          "HelloHello\n" + "4,3,HelloHello world\n" + "2,2,Hello worldHi\n" + "4,3," +
-          "Hello worldHello\n" + "4,4,Hello worldHello world"
-
-      case 9 => // 11 in Java CrossITCase
-        /*
-         * check correctness of cross a tuple input and a custom type input
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env)
-        val crossDs = ds.cross(ds2) {
-          (l, r) => (l._1 + r.myInt, l._3 * r.myLong, l._4 + r.myString)
-        }
-        crossDs.writeAsCsv(resultPath)
-        env.execute()
-        "2,0,HalloHi\n" + "3,0,HalloHello\n" + "3,0,HalloHello world\n" + "3,0," +
-          "Hallo WeltHi\n" + "4,1,Hallo WeltHello\n" + "4,2,Hallo WeltHello world\n" + "3,0," +
-          "Hallo Welt wieHi\n" + "4,2,Hallo Welt wieHello\n" + "4,4,Hallo Welt wieHello world\n"
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
   }
-}
 
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
 
-@RunWith(classOf[Parameterized])
-class CrossITCase(config: Configuration) extends JavaProgramTestBase(config) {
+  @Test
+  def testCorrectnessOfCrossOnTwoTupleInputs: Unit = {
+    /*
+     * check correctness of cross on two tuple inputs
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+    val crossDs = ds.cross(ds2) { (l, r) => (l._3 + r._3, l._4 + r._4) }
+    crossDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+
+    expected = "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1," +
+      "Hallo WeltHallo\n" + "2,Hallo WeltHallo Welt\n" + "3,Hallo WeltHallo Welt wie\n" + "2," +
+      "Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4," +
+      "Hallo Welt wieHallo Welt wie\n"
+  }
 
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+  @Test
+  def testCorrectnessOfCrossIfUDFReturnsLeftInput: Unit = {
+    /*
+     * check correctness of cross if UDF returns left input object
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+    val crossDs = ds.cross(ds2) { (l, r ) => l }
+    crossDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+
+    expected = "1,1,Hi\n" + "1,1,Hi\n" + "1,1,Hi\n" + "2,2,Hello\n" + "2,2,Hello\n" + "2,2," +
+      "Hello\n" + "3,2,Hello world\n" + "3,2,Hello world\n" + "3,2,Hello world\n"
+  }
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+  @Test
+  def testCorrectnessOfCrossIfUDFReturnsRightInput: Unit = {
+    /*
+     * check correctness of cross if UDF returns right input object
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+    val crossDs = ds.cross(ds2) { (l, r) => r }
+    crossDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+
+    expected = "1,1,0,Hallo,1\n" + "1,1,0,Hallo,1\n" + "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt," +
+      "2\n" + "2,2,1,Hallo Welt,2\n" + "2,2,1,Hallo Welt,2\n" + "2,3,2,Hallo Welt wie," +
+      "1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,3,2,Hallo Welt wie,1\n"
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = CrossProgs.runProgram(curProgId, resultPath)
+  @Test
+  def testCorrectnessOfCrossWithBroadcastSet: Unit = {
+    /*
+     * check correctness of cross with broadcast set
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val intDs = CollectionDataSets.getIntDataSet(env)
+    val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+    val crossDs = ds.cross(ds2).apply (
+      new RichCrossFunction[
+        (Int, Long, Int, String, Long),
+        (Int, Long, Int, String, Long),
+        (Int, Int, Int)] {
+        private var broadcast = 41
+
+        override def open(config: Configuration) {
+          val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+          broadcast = ints.sum
+        }
+
+        override def cross(
+                            first: (Int, Long, Int, String, Long),
+                            second: (Int, Long, Int, String, Long)): (Int, Int, Int) = {
+          (first._1 + second._1, first._3.toInt * second._3.toInt, broadcast)
+        }
+
+      })withBroadcastSet(intDs, "ints")
+    crossDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,0,55\n" + "3,0,55\n" + "3,0,55\n" + "3,0,55\n" + "4,1,55\n" + "4,2,55\n" + "3," +
+      "0,55\n" + "4,2,55\n" + "4,4,55\n"
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testCorrectnessOfCrossWithHuge: Unit = {
+    /*
+     * check correctness of crossWithHuge (only correctness of result -> should be the same
+     * as with normal cross)
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+    val crossDs = ds.crossWithHuge(ds2) { (l, r) => (l._3 + r._3, l._4 + r._4)}
+    crossDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1," +
+      "Hallo WeltHallo\n" + "2,Hallo WeltHallo Welt\n" + "3,Hallo WeltHallo Welt wie\n" + "2," +
+      "Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4," +
+      "Hallo Welt wieHallo Welt wie\n"
+  }
+
+  @Test
+  def testCorrectnessOfCrossWithTiny: Unit = {
+    /*
+     * check correctness of crossWithTiny (only correctness of result -> should be the same
+     * as with normal cross)
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets
+      .getSmall5TupleDataSet(env)
+    val ds2 = CollectionDataSets
+      .getSmall5TupleDataSet(env)
+    val crossDs = ds.crossWithTiny(ds2) { (l, r) => (l._3 + r._3, l._4 + r._4)}
+    crossDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1," +
+      "Hallo WeltHallo\n" + "2,Hallo WeltHallo Welt\n" + "3,Hallo WeltHallo Welt wie\n" + "2," +
+      "Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4," +
+      "Hallo Welt wieHallo Welt wie\n"
+  }
+
+  @Test
+  def testCorrectnessOfDefaultCross: Unit = {
+    /*
+     * check correctness of default cross
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+    val crossDs = ds.cross(ds2)
+    crossDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" + "(1,1,Hi),(1,1,0,Hallo,1)\n" + "(1,1,Hi),(2,3," +
+      "2,Hallo Welt wie,1)\n" + "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" + "(2,2,Hello),(1,1,0," +
+      "Hallo,1)\n" + "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" + "(3,2,Hello world),(2,2,1," +
+      "Hallo Welt,2)\n" + "(3,2,Hello world),(1,1,0,Hallo,1)\n" + "(3,2,Hello world),(2,3,2," +
+      "Hallo Welt wie,1)\n"
   }
-}
 
-object CrossITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to CrossProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
+  @Test
+  def testCorrectnessOfCrossOnTwoCutomTypeInputs: Unit = {
+    /*
+     * check correctness of cross on two custom type inputs
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmallCustomTypeDataSet(env)
+    val ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env)
+    val crossDs = ds.cross(ds2) {
+      (l, r) => new CustomType(l.myInt * r.myInt, l.myLong + r.myLong, l.myString + r.myString)
     }
+    crossDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,0,HiHi\n" + "2,1,HiHello\n" + "2,2,HiHello world\n" + "2,1,HelloHi\n" + "4,2," +
+      "HelloHello\n" + "4,3,HelloHello world\n" + "2,2,Hello worldHi\n" + "4,3," +
+      "Hello worldHello\n" + "4,4,Hello worldHello world"
+  }
 
-    configs.asJavaCollection
+  @Test
+  def testCorrectnessOfcrossTupleInputAndCustomTypeInput: Unit = {
+    /*
+     * check correctness of cross a tuple input and a custom type input
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env)
+    val crossDs = ds.cross(ds2) {
+      (l, r) => (l._1 + r.myInt, l._3 * r.myLong, l._4 + r.myString)
+    }
+    crossDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,0,HalloHi\n" + "3,0,HalloHello\n" + "3,0,HalloHello world\n" + "3,0," +
+      "Hallo WeltHi\n" + "4,1,Hallo WeltHello\n" + "4,2,Hallo WeltHello world\n" + "3,0," +
+      "Hallo Welt wieHi\n" + "4,2,Hallo Welt wieHello\n" + "4,4,Hallo Welt wieHello world\n"
   }
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index 946c425..3711347 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -19,173 +19,161 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
+@RunWith(classOf[Parameterized])
+class DistinctITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
 
-object DistinctProgs {
-  var NUM_PROGRAMS: Int = 8
-
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Check correctness of distinct on tuples with key field selector
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-
-        val distinctDs = ds.union(ds).distinct(0, 1, 2)
-        distinctDs.writeAsCsv(resultPath)
-
-        env.execute()
-
-        // return expected result
-        "1,1,Hi\n" +
-          "2,2,Hello\n" +
-          "3,2,Hello world\n"
-        
-      case 2 =>
-        /*
-         * check correctness of distinct on tuples with key field selector with not all fields
-         * selected
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
-
-        val distinctDs = ds.union(ds).distinct(0).map(_._1)
-        
-        distinctDs.writeAsText(resultPath)
-        env.execute()
-        "1\n" + "2\n"
-        
-      case 3 =>
-        /*
-         * check correctness of distinct on tuples with key extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
-
-        val reduceDs = ds.union(ds).distinct(_._1).map(_._1)
-
-        reduceDs.writeAsText(resultPath)
-        env.execute()
-        "1\n" + "2\n"
-
-      case 4 =>
-        /*
-         * check correctness of distinct on custom type with type extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-
-        val reduceDs = ds.distinct(_.myInt).map( t => new Tuple1(t.myInt))
-
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-
-      case 5 =>
-        /*
-         * check correctness of distinct on tuples
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-
-        val distinctDs = ds.union(ds).distinct()
-
-        distinctDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n"
-
-      case 6 =>
-        /*
-         * check correctness of distinct on custom type with tuple-returning type extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-
-        val reduceDs = ds.distinct( t => (t._1, t._5)).map( t => (t._1, t._5) )
-
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1\n" + "2,1\n" + "2,2\n" + "3,2\n" + "3,3\n" + "4,1\n" + "4,2\n" + "5," +
-          "1\n" + "5,2\n" + "5,3\n"
-
-      case 7 =>
-        /*
-         * check correctness of distinct on tuples with field expressions
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
-
-        val reduceDs = ds.union(ds).distinct("_1").map(t => new Tuple1(t._1))
-
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1\n" + "2\n"
-
-      case 8 =>
-        /*
-         * check correctness of distinct on Pojos
-         */
-
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getDuplicatePojoDataSet(env)
-
-        val reduceDs = ds.distinct("nestedPojo.longNumber").map(_.nestedPojo.longNumber.toInt)
-
-        reduceDs.writeAsText(resultPath)
-        env.execute()
-        "10000\n20000\n30000\n"
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
   }
-}
 
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
 
-@RunWith(classOf[Parameterized])
-class DistinctITCase(config: Configuration) extends JavaProgramTestBase(config) {
+  @Test
+  def testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector: Unit = {
+    /*
+     * Check correctness of distinct on tuples with key field selector
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
 
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+    val distinctDs = ds.union(ds).distinct(0, 1, 2)
+    distinctDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+    env.execute()
+
+    expected = "1,1,Hi\n" +
+      "2,2,Hello\n" +
+      "3,2,Hello world\n"
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = DistinctProgs.runProgram(curProgId, resultPath)
+  @Test
+  def testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorNotAllFieldsSelected: Unit = {
+    /*
+     * check correctness of distinct on tuples with key field selector with not all fields
+     * selected
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+
+    val distinctDs = ds.union(ds).distinct(0).map(_._1)
+
+    distinctDs.writeAsText(resultPath, writeMode =  WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1\n" + "2\n"
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testCorrectnessOfDistinctOnTuplesWithKeyExtractor: Unit ={
+    /*
+     * check correctness of distinct on tuples with key extractor
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+
+    val reduceDs = ds.union(ds).distinct(_._1).map(_._1)
+
+    reduceDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1\n" + "2\n"
   }
-}
 
-object DistinctITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to DistinctProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
+  @Test
+  def testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor: Unit = {
+    /*
+     * check correctness of distinct on custom type with type extractor
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+
+    val reduceDs = ds.distinct(_.myInt).map( t => new Tuple1(t.myInt))
+
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+  }
+
+  @Test
+  def testCorrectnessOfDistinctOnTuples: Unit = {
+    /*
+     * check correctness of distinct on tuples
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
+
+    val distinctDs = ds.union(ds).distinct()
+
+    distinctDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n"
+  }
+
+  @Test
+  def testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor: Unit = {
+    /*
+     * check correctness of distinct on custom type with tuple-returning type extractor
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get5TupleDataSet(env)
+
+    val reduceDs = ds.distinct( t => (t._1, t._5)).map( t => (t._1, t._5) )
+
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1\n" + "2,1\n" + "2,2\n" + "3,2\n" + "3,3\n" + "4,1\n" + "4,2\n" + "5," +
+      "1\n" + "5,2\n" + "5,3\n"
+  }
+
+  @Test
+  def testCorrectnessOfDistinctOnTuplesWithFieldExpressions: Unit = {
+    /*
+     * check correctness of distinct on tuples with field expressions
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+
+    val reduceDs = ds.union(ds).distinct("_1").map(t => new Tuple1(t._1))
+
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1\n" + "2\n"
+  }
+
+  @Test
+  def testCorrectnessOfDistinctOnPojos: Unit = {
+    /*
+     * check correctness of distinct on Pojos
+     */
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getDuplicatePojoDataSet(env)
+
+    val reduceDs = ds.distinct("nestedPojo.longNumber").map(_.nestedPojo.longNumber.toInt)
+
+    reduceDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "10000\n20000\n30000\n"
   }
 }
 
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
index eb3ddb2..770759b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
@@ -19,7 +19,11 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.apache.flink.api.scala._
@@ -57,108 +61,94 @@ class PojoWithPojo(var myString: String, var myInt: Int, var nested: Nested) {
   override def toString = s"myString=$myString myInt=$myInt nested.myLong=${nested.myLong}"
 }
 
-object ExampleProgs {
-  var NUM_PROGRAMS: Int = 4
-
-  def runProgram(progId: Int, resultPath: String, onCollection: Boolean): String = {
-    progId match {
-      case 1 =>
-        /*
-          Test nested tuples with int offset
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.fromElements( (("this","is"), 1), (("this", "is"),2), (("this","hello"),3) )
-
-        val grouped = ds.groupBy(0).reduce( { (e1, e2) => ((e1._1._1, e1._1._2), e1._2 + e2._2)})
-        grouped.writeAsText(resultPath)
-        env.execute()
-        "((this,hello),3)\n((this,is),3)\n"
-
-      case 2 =>
-        /*
-          Test nested tuples with int offset
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.fromElements( (("this","is"), 1), (("this", "is"),2), (("this","hello"),3) )
-
-        val grouped = ds.groupBy("_1._1").reduce{
-          (e1, e2) => ((e1._1._1, e1._1._2), e1._2 + e2._2)
-        }
-        grouped.writeAsText(resultPath)
-        env.execute()
-        "((this,is),6)\n"
-
-      case 3 =>
-        /*
-          Test nested pojos
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.fromElements(
-          new PojoWithPojo("one", 1, 1L),
-          new PojoWithPojo("one", 1, 1L),
-          new PojoWithPojo("two", 666, 2L) )
-
-        val grouped = ds.groupBy("nested.myLong").reduce {
-          (p1, p2) =>
-            p1.myInt += p2.myInt
-            p1
-        }
-        grouped.writeAsText(resultPath)
-        env.execute()
-        "myString=two myInt=666 nested.myLong=2\nmyString=one myInt=2 nested.myLong=1\n"
-
-      case 4 =>
-        /*
-          Test pojo with nested case class
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.fromElements(
-          new Pojo("one", 1, 1L),
-          new Pojo("one", 1, 1L),
-          new Pojo("two", 666, 2L) )
-
-        val grouped = ds.groupBy("nested.myLong").reduce {
-          (p1, p2) =>
-            p1.myInt += p2.myInt
-            p1
-        }
-        grouped.writeAsText(resultPath)
-        env.execute()
-        "myString=two myInt=666 nested.myLong=2\nmyString=one myInt=2 nested.myLong=1\n"
-    }
-  }
-}
-
 @RunWith(classOf[Parameterized])
-class ExamplesITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
+class ExamplesITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
   private var resultPath: String = null
-  private var expectedResult: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = ExampleProgs.runProgram(curProgId, resultPath, isCollectionExecution)
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testNestesdTuplesWithIntOffset: Unit = {
+    /*
+     * Test nested tuples with int offset
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements( (("this","is"), 1), (("this", "is"),2), (("this","hello"),3) )
+
+    val grouped = ds.groupBy(0).reduce( { (e1, e2) => ((e1._1._1, e1._1._2), e1._2 + e2._2)})
+    grouped.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "((this,hello),3)\n((this,is),3)\n"
   }
-}
 
-object ExamplesITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to ExampleProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
+  @Test
+  def testNestedTuplesWithExpressionKeys: Unit = {
+    /*
+     * Test nested tuples with expression keys
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements( (("this","is"), 1), (("this", "is"),2), (("this","hello"),3) )
+
+    val grouped = ds.groupBy("_1._1").reduce{
+      (e1, e2) => ((e1._1._1, e1._1._2), e1._2 + e2._2)
+    }
+    grouped.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "((this,is),6)\n"
+  }
+
+  @Test
+  def testNestedPojos: Unit = {
+    /*
+     * Test nested pojos
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(
+      new PojoWithPojo("one", 1, 1L),
+      new PojoWithPojo("one", 1, 1L),
+      new PojoWithPojo("two", 666, 2L) )
+
+    val grouped = ds.groupBy("nested.myLong").reduce {
+      (p1, p2) =>
+        p1.myInt += p2.myInt
+        p1
     }
+    grouped.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "myString=two myInt=666 nested.myLong=2\nmyString=one myInt=2 nested.myLong=1\n"
+  }
 
-    configs.asJavaCollection
+  @Test
+  def testPojoWithNestedCaseClass: Unit = {
+    /*
+     * Test pojo with nested case class
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(
+      new Pojo("one", 1, 1L),
+      new Pojo("one", 1, 1L),
+      new Pojo("two", 666, 2L) )
+
+    val grouped = ds.groupBy("nested.myLong").reduce {
+      (p1, p2) =>
+        p1.myInt += p2.myInt
+        p1
+    }
+    grouped.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "myString=two myInt=666 nested.myLong=2\nmyString=one myInt=2 nested.myLong=1\n"
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
index 569531f..13954e8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
@@ -20,154 +20,143 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.common.functions.RichFilterFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
+@RunWith(classOf[Parameterized])
+class FilterITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
 
-object FilterProgs {
-  var NUM_PROGRAMS: Int = 7
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Test all-rejecting filter.
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val filterDs = ds.filter( t => false )
-        filterDs.writeAsCsv(resultPath)
-        env.execute()
-        "\n"
-
-      case 2 =>
-        /*
-         * Test all-passing filter.
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val filterDs = ds.filter( t => true )
-        filterDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-          "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-          "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-          "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-          "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-          "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
-      case 3 =>
-        /*
-         * Test filter on String tuple field.
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val filterDs = ds.filter( _._3.contains("world") )
-        filterDs.writeAsCsv(resultPath)
-        env.execute()
-        "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-
-      case 4 =>
-        /*
-         * Test filter on Integer tuple field.
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val filterDs = ds.filter( _._1 % 2 == 0 )
-        filterDs.writeAsCsv(resultPath)
-        env.execute()
-        "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-          "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-          "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-
-      case 5 =>
-        /*
-         * Test filter on basic type
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getStringDataSet(env)
-        val filterDs = ds.filter( _.startsWith("H") )
-        filterDs.writeAsText(resultPath)
-        env.execute()
-        "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
-
-      case 6 =>
-        /*
-         * Test filter on custom type
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val filterDs = ds.filter( _.myString.contains("a") )
-        filterDs.writeAsText(resultPath)
-        env.execute()
-        "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
-
-      case 7 =>
-        /*
-         * Test filter on String tuple field.
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ints = CollectionDataSets.getIntDataSet(env)
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val filterDs = ds.filter( new RichFilterFunction[(Int, Long, String)] {
-          var literal = -1
-          override def open(config: Configuration): Unit = {
-            val ints = getRuntimeContext.getBroadcastVariable[Int]("ints")
-            for (i <- ints.asScala) {
-              literal = if (literal < i) i else literal
-            }
-          }
-          override def filter(value: (Int, Long, String)): Boolean = value._1 < literal
-        }).withBroadcastSet(ints, "ints")
-        filterDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+  @Rule
+  def tempFolder = _tempFolder
 
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
   }
-}
 
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
 
-@RunWith(classOf[Parameterized])
-class FilterITCase(config: Configuration) extends JavaProgramTestBase(config) {
+  @Test
+  def testAllRejectingFilter: Unit = {
+    /*
+     * Test all-rejecting filter.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val filterDs = ds.filter( t => false )
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "\n"
+  }
 
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+  @Test
+  def testAllPassingFilter: Unit = {
+    /*
+     * Test all-passing filter.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val filterDs = ds.filter( t => true )
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+  @Test
+  def testFilterOnStringTupleField: Unit = {
+    /*
+     * Test filter on String tuple field.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val filterDs = ds.filter( _._3.contains("world") )
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = FilterProgs.runProgram(curProgId, resultPath)
+  @Test
+  def testFilterOnIntegerTupleField: Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val filterDs = ds.filter( _._1 % 2 == 0 )
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+      "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testFilterBasicType: Unit = {
+    /*
+     * Test filter on basic type
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getStringDataSet(env)
+    val filterDs = ds.filter( _.startsWith("H") )
+    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
   }
-}
 
-object FilterITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to FilterProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
+  @Test
+  def testFilterOnCustomType: Unit = {
+    /*
+     * Test filter on custom type
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val filterDs = ds.filter( _.myString.contains("a") )
+    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+  }
 
-    configs.asJavaCollection
+  @Test
+  def testRichFilterOnStringTupleField: Unit = {
+    /*
+     * Test filter on String tuple field.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ints = CollectionDataSets.getIntDataSet(env)
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val filterDs = ds.filter( new RichFilterFunction[(Int, Long, String)] {
+      var literal = -1
+      override def open(config: Configuration): Unit = {
+        val ints = getRuntimeContext.getBroadcastVariable[Int]("ints")
+        for (i <- ints.asScala) {
+          literal = if (literal < i) i else literal
+        }
+      }
+      override def filter(value: (Int, Long, String)): Boolean = value._1 < literal
+    }).withBroadcastSet(ints, "ints")
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
   }
-}
 
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
index 51c4691..de07cb5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
@@ -19,98 +19,75 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
-
-object FirstNProgs {
-  var NUM_PROGRAMS: Int = 3
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * First-n on ungrouped data set
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val seven = ds.first(7).map( t => new Tuple1(1) ).sum(0)
-        seven.writeAsText(resultPath)
-        env.execute()
-        "(7)\n"
-
-      case 2 =>
-        /*
-         * First-n on grouped data set
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val first = ds.groupBy(1).first(4).map( t => (t._2, 1)).groupBy(0).sum(1)
-        first.writeAsText(resultPath)
-        env.execute()
-        "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n"
-
-      case 3 =>
-        /*
-         * First-n on grouped and sorted data set
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val first = ds.groupBy(1)
-          .sortGroup(0, Order.DESCENDING)
-          .first(3)
-          .map ( t => (t._2, t._1))
-        first.writeAsText(resultPath)
-        env.execute()
-        "(1,1)\n" + "(2,3)\n(2,2)\n" + "(3,6)\n(3,5)\n(3,4)\n" + "(4,10)\n(4,9)\n(4," +
-          "8)\n" + "(5,15)\n(5,14)\n(5,13)\n" + "(6,21)\n(6,20)\n(6,19)\n"
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
-  }
-}
-
-
 @RunWith(classOf[Parameterized])
-class FirstNITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
+class FirstNITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
   private var resultPath: String = null
-  private var expectedResult: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = FirstNProgs.runProgram(curProgId, resultPath)
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testFirstNOnUngroupedDataSet: Unit = {
+    /*
+     * First-n on ungrouped data set
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val seven = ds.first(7).map( t => new Tuple1(1) ).sum(0)
+    seven.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "(7)\n"
   }
-}
 
-object FirstNITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to FirstNProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
+  @Test
+  def testFirstNOnGroupedDataSet: Unit = {
+    /*
+     * First-n on grouped data set
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val first = ds.groupBy(1).first(4).map( t => (t._2, 1)).groupBy(0).sum(1)
+    first.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n"
+  }
 
-    configs.asJavaCollection
+  @Test
+  def testFirstNOnGroupedAndSortedDataSet: Unit = {
+    /*
+     * First-n on grouped and sorted data set
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val first = ds.groupBy(1)
+      .sortGroup(0, Order.DESCENDING)
+      .first(3)
+      .map ( t => (t._2, t._1))
+    first.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "(1,1)\n" + "(2,3)\n(2,2)\n" + "(3,6)\n(3,5)\n(3,4)\n" + "(4,10)\n(4,9)\n(4," +
+      "8)\n" + "(5,15)\n(5,14)\n(5,13)\n" + "(6,21)\n(6,20)\n(6,19)\n"
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
index f3d6d75..d09ce1f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
@@ -21,199 +21,187 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
+@RunWith(classOf[Parameterized])
+class FlatMapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
 
-object FlatMapProgs {
-  var NUM_PROGRAMS: Int = 7
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Test non-passing flatmap
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getStringDataSet(env)
-        val nonPassingFlatMapDs = ds.flatMap( in => if (in.contains("banana")) Some(in) else None )
-        nonPassingFlatMapDs.writeAsText(resultPath)
-        env.execute()
-        "\n"
-        
-      case 2 =>
-        /*
-         * Test data duplicating flatmap
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getStringDataSet(env)
-        val duplicatingFlatMapDs = ds.flatMap( in => Seq(in, in.toUpperCase) )
-        duplicatingFlatMapDs.writeAsText(resultPath)
-        env.execute()
-        "Hi\n" + "HI\n" + "Hello\n" + "HELLO\n" + "Hello world\n" + "HELLO WORLD\n" +
-          "Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" + "I am fine.\n" + "I AM " +
-          "FINE.\n" + "Luke Skywalker\n" + "LUKE SKYWALKER\n" + "Random comment\n" + "RANDOM " +
-          "COMMENT\n" + "LOL\n" + "LOL\n"
-
-      case 3 =>
-        /*
-         * Test flatmap with varying number of emitted tuples
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val varyingTuplesMapDs = ds.flatMap {
-          in =>
-            val numTuples = in._1 % 3
-            (0 until numTuples) map { i => in }
-        }
-        varyingTuplesMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "2,2,Hello\n" + "4,3,Hello world, " +
-          "how are you?\n" + "5,3,I am fine.\n" + "5,3,I am fine.\n" + "7,4,Comment#1\n" + "8,4," +
-          "Comment#2\n" + "8,4,Comment#2\n" + "10,4,Comment#4\n" + "11,5,Comment#5\n" + "11,5," +
-          "Comment#5\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "14,5,Comment#8\n" + "16,6," +
-          "Comment#10\n" + "17,6,Comment#11\n" + "17,6,Comment#11\n" + "19,6,Comment#13\n" + "20," +
-          "6,Comment#14\n" + "20,6,Comment#14\n"
+  @Rule
+  def tempFolder = _tempFolder
 
-      case 4 =>
-        /*
-         * Test type conversion flatmapper (Custom -> Tuple)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val typeConversionFlatMapDs = ds.flatMap { in => Some((in.myInt, in.myLong, in.myString)) }
-        typeConversionFlatMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,0,Hi\n" + "2,1,Hello\n" + "2,2,Hello world\n" + "3,3,Hello world, " +
-          "how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + "4,6," +
-          "Comment#1\n" + "4,7,Comment#2\n" + "4,8,Comment#3\n" + "4,9,Comment#4\n" + "5,10," +
-          "Comment#5\n" + "5,11,Comment#6\n" + "5,12,Comment#7\n" + "5,13,Comment#8\n" + "5,14," +
-          "Comment#9\n" + "6,15,Comment#10\n" + "6,16,Comment#11\n" + "6,17,Comment#12\n" + "6," +
-          "18,Comment#13\n" + "6,19,Comment#14\n" + "6,20,Comment#15\n"
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
 
-      case 5 =>
-        /*
-         * Test type conversion flatmapper (Tuple -> Basic)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val typeConversionFlatMapDs = ds.flatMap ( in => Some(in._3) )
-        typeConversionFlatMapDs.writeAsText(resultPath)
-        env.execute()
-        "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
-          ".\n" + "Luke Skywalker\n" + "Comment#1\n" + "Comment#2\n" + "Comment#3\n" +
-          "Comment#4\n" + "Comment#5\n" + "Comment#6\n" + "Comment#7\n" + "Comment#8\n" +
-          "Comment#9\n" + "Comment#10\n" + "Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
-          "Comment#14\n" + "Comment#15\n"
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
 
-      case 6 =>
-        /*
-         * Test flatmapper if UDF returns input object
-         * multiple times and changes it in between
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env).map {
-          t => MutableTuple3(t._1, t._2, t._3)
-        }
-        val inputObjFlatMapDs = ds.flatMap {
-          (in, out: Collector[MutableTuple3[Int, Long, String]]) =>
-            val numTuples = in._1 % 4
-            (0 until numTuples) foreach { i => in._1 = i; out.collect(in) }
-        }
-        inputObjFlatMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "0,1,Hi\n" + "0,2,Hello\n" + "1,2,Hello\n" + "0,2,Hello world\n" + "1,2," +
-          "Hello world\n" + "2,2,Hello world\n" + "0,3,I am fine.\n" + "0,3," +
-          "Luke Skywalker\n" + "1,3,Luke Skywalker\n" + "0,4,Comment#1\n" + "1,4," +
-          "Comment#1\n" + "2,4,Comment#1\n" + "0,4,Comment#3\n" + "0,4,Comment#4\n" + "1,4," +
-          "Comment#4\n" + "0,5,Comment#5\n" + "1,5,Comment#5\n" + "2,5,Comment#5\n" + "0,5," +
-          "Comment#7\n" + "0,5,Comment#8\n" + "1,5,Comment#8\n" + "0,5,Comment#9\n" + "1,5," +
-          "Comment#9\n" + "2,5,Comment#9\n" + "0,6,Comment#11\n" + "0,6,Comment#12\n" + "1,6," +
-          "Comment#12\n" + "0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" + "0,6," +
-          "Comment#15\n"
+  @Test
+  def testNonPassingFlatMap: Unit = {
+    /*
+     * Test non-passing flatmap
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getStringDataSet(env)
+    val nonPassingFlatMapDs = ds.flatMap( in => if (in.contains("banana")) Some(in) else None )
+    nonPassingFlatMapDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "\n"
+  }
 
-      case 7 =>
-        /*
-         * Test flatmap with broadcast set
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ints = CollectionDataSets.getIntDataSet(env)
-        val ds = CollectionDataSets.get3TupleDataSet(env).map {
-          t => MutableTuple3(t._1, t._2, t._3)
-        }
-        val bcFlatMapDs = ds.flatMap(
-          new RichFlatMapFunction[MutableTuple3[Int, Long, String],
-            MutableTuple3[Int, Long, String]] {
-            private var f2Replace = 0
-            private val outTuple = MutableTuple3(0, 0L, "")
-            override def open(config: Configuration): Unit = {
-              val ints = getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
-              f2Replace = ints.sum
-            }
-            override def flatMap(
-                value: MutableTuple3[Int, Long, String],
-                out: Collector[MutableTuple3[Int, Long, String]]): Unit = {
-                outTuple._1 = f2Replace
-                outTuple._2 = value._2
-                outTuple._3 = value._3
-                out.collect(outTuple)
-            }
-          }).withBroadcastSet(ints, "ints")
-        bcFlatMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "55,1,Hi\n" + "55,2,Hello\n" + "55,2,Hello world\n" + "55,3,Hello world, " +
-          "how are you?\n" + "55,3,I am fine.\n" + "55,3,Luke Skywalker\n" + "55,4," +
-          "Comment#1\n" + "55,4,Comment#2\n" + "55,4,Comment#3\n" + "55,4,Comment#4\n" + "55,5," +
-          "Comment#5\n" + "55,5,Comment#6\n" + "55,5,Comment#7\n" + "55,5,Comment#8\n" + "55,5," +
-          "Comment#9\n" + "55,6,Comment#10\n" + "55,6,Comment#11\n" + "55,6,Comment#12\n" + "55," +
-          "6,Comment#13\n" + "55,6,Comment#14\n" + "55,6,Comment#15\n"
+  @Test
+  def testDataDuplicatingFlatMap: Unit = {
+    /*
+     * Test data duplicating flatmap
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getStringDataSet(env)
+    val duplicatingFlatMapDs = ds.flatMap( in => Seq(in, in.toUpperCase) )
+    duplicatingFlatMapDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi\n" + "HI\n" + "Hello\n" + "HELLO\n" + "Hello world\n" + "HELLO WORLD\n" +
+      "Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" + "I am fine.\n" + "I AM " +
+      "FINE.\n" + "Luke Skywalker\n" + "LUKE SKYWALKER\n" + "Random comment\n" + "RANDOM " +
+      "COMMENT\n" + "LOL\n" + "LOL\n"
+  }
 
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
+  @Test
+  def testFlatMapWithVaryingNumberOfEmittedTuples: Unit = {
+    /*
+     * Test flatmap with varying number of emitted tuples
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val varyingTuplesMapDs = ds.flatMap {
+      in =>
+        val numTuples = in._1 % 3
+        (0 until numTuples) map { i => in }
     }
+    varyingTuplesMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "2,2,Hello\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "5,3,I am fine.\n" + "7,4,Comment#1\n" + "8,4," +
+      "Comment#2\n" + "8,4,Comment#2\n" + "10,4,Comment#4\n" + "11,5,Comment#5\n" + "11,5," +
+      "Comment#5\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "14,5,Comment#8\n" + "16,6," +
+      "Comment#10\n" + "17,6,Comment#11\n" + "17,6,Comment#11\n" + "19,6,Comment#13\n" + "20," +
+      "6,Comment#14\n" + "20,6,Comment#14\n"
   }
-}
 
-
-@RunWith(classOf[Parameterized])
-class FlatMapITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+  @Test
+  def testTypeConversionFlatMapperCustomToTuple: Unit = {
+    /*
+     * Test type conversion flatmapper (Custom -> Tuple)
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val typeConversionFlatMapDs = ds.flatMap { in => Some((in.myInt, in.myLong, in.myString)) }
+    typeConversionFlatMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,0,Hi\n" + "2,1,Hello\n" + "2,2,Hello world\n" + "3,3,Hello world, " +
+      "how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + "4,6," +
+      "Comment#1\n" + "4,7,Comment#2\n" + "4,8,Comment#3\n" + "4,9,Comment#4\n" + "5,10," +
+      "Comment#5\n" + "5,11,Comment#6\n" + "5,12,Comment#7\n" + "5,13,Comment#8\n" + "5,14," +
+      "Comment#9\n" + "6,15,Comment#10\n" + "6,16,Comment#11\n" + "6,17,Comment#12\n" + "6," +
+      "18,Comment#13\n" + "6,19,Comment#14\n" + "6,20,Comment#15\n"
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = FlatMapProgs.runProgram(curProgId, resultPath)
+  @Test
+  def testTypeConversionFlatMapperTupleToBasic: Unit = {
+    /*
+         * Test type conversion flatmapper (Tuple -> Basic)
+         */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val typeConversionFlatMapDs = ds.flatMap ( in => Some(in._3) )
+    typeConversionFlatMapDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
+      ".\n" + "Luke Skywalker\n" + "Comment#1\n" + "Comment#2\n" + "Comment#3\n" +
+      "Comment#4\n" + "Comment#5\n" + "Comment#6\n" + "Comment#7\n" + "Comment#8\n" +
+      "Comment#9\n" + "Comment#10\n" + "Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
+      "Comment#14\n" + "Comment#15\n"
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt: Unit = {
+    /*
+     * Test flatmapper if UDF returns input object
+     * multiple times and changes it in between
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).map {
+      t => MutableTuple3(t._1, t._2, t._3)
+    }
+    val inputObjFlatMapDs = ds.flatMap {
+      (in, out: Collector[MutableTuple3[Int, Long, String]]) =>
+        val numTuples = in._1 % 4
+        (0 until numTuples) foreach { i => in._1 = i; out.collect(in) }
+    }
+    inputObjFlatMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "0,1,Hi\n" + "0,2,Hello\n" + "1,2,Hello\n" + "0,2,Hello world\n" + "1,2," +
+      "Hello world\n" + "2,2,Hello world\n" + "0,3,I am fine.\n" + "0,3," +
+      "Luke Skywalker\n" + "1,3,Luke Skywalker\n" + "0,4,Comment#1\n" + "1,4," +
+      "Comment#1\n" + "2,4,Comment#1\n" + "0,4,Comment#3\n" + "0,4,Comment#4\n" + "1,4," +
+      "Comment#4\n" + "0,5,Comment#5\n" + "1,5,Comment#5\n" + "2,5,Comment#5\n" + "0,5," +
+      "Comment#7\n" + "0,5,Comment#8\n" + "1,5,Comment#8\n" + "0,5,Comment#9\n" + "1,5," +
+      "Comment#9\n" + "2,5,Comment#9\n" + "0,6,Comment#11\n" + "0,6,Comment#12\n" + "1,6," +
+      "Comment#12\n" + "0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" + "0,6," +
+      "Comment#15\n"
   }
-}
 
-object FlatMapITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to FlatMapProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
+  @Test
+  def testFlatMapWithBroadcastSet: Unit = {
+    /*
+     * Test flatmap with broadcast set
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ints = CollectionDataSets.getIntDataSet(env)
+    val ds = CollectionDataSets.get3TupleDataSet(env).map {
+      t => MutableTuple3(t._1, t._2, t._3)
     }
-
-    configs.asJavaCollection
+    val bcFlatMapDs = ds.flatMap(
+      new RichFlatMapFunction[MutableTuple3[Int, Long, String],
+        MutableTuple3[Int, Long, String]] {
+        private var f2Replace = 0
+        private val outTuple = MutableTuple3(0, 0L, "")
+        override def open(config: Configuration): Unit = {
+          val ints = getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+          f2Replace = ints.sum
+        }
+        override def flatMap(
+                              value: MutableTuple3[Int, Long, String],
+                              out: Collector[MutableTuple3[Int, Long, String]]): Unit = {
+          outTuple._1 = f2Replace
+          outTuple._2 = value._2
+          outTuple._3 = value._3
+          out.collect(outTuple)
+        }
+      }).withBroadcastSet(ints, "ints")
+    bcFlatMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "55,1,Hi\n" + "55,2,Hello\n" + "55,2,Hello world\n" + "55,3,Hello world, " +
+      "how are you?\n" + "55,3,I am fine.\n" + "55,3,Luke Skywalker\n" + "55,4," +
+      "Comment#1\n" + "55,4,Comment#2\n" + "55,4,Comment#3\n" + "55,4,Comment#4\n" + "55,5," +
+      "Comment#5\n" + "55,5,Comment#6\n" + "55,5,Comment#7\n" + "55,5,Comment#8\n" + "55,5," +
+      "Comment#9\n" + "55,6,Comment#10\n" + "55,6,Comment#11\n" + "55,6,Comment#12\n" + "55," +
+      "6,Comment#13\n" + "55,6,Comment#14\n" + "55,6,Comment#15\n"
   }
 }
-


Mime
View raw message