flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [09/14] [FLINK-1171] Move Scala API tests to flink-tests project
Date Sat, 18 Oct 2014 17:34:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
deleted file mode 100644
index fe1dd43..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
-import org.junit.Assert
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.operators.Order
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-
-class GroupingTest {
-
-  private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
-  private val customTypeData = Array[CustomType](new CustomType())
-  private val emptyLongData = Array[Long]()
-
-  @Test
-  def testGroupByKeyIndices1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tupleDs = env.fromCollection(emptyTupleData)
-
-    // should work
-    try {
-      tupleDs.groupBy(0)
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testGroupByKeyIndices2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val longDs = env.fromCollection(emptyLongData)
-
-    // should not work, grouping on basic type
-    longDs.groupBy(0)
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testGroupByKeyIndices3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val customDs = env.fromCollection(customTypeData)
-
-    // should not work, field position key on custom type
-    customDs.groupBy(0)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupByKeyIndices4(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tupleDs = env.fromCollection(emptyTupleData)
-
-    // should not work, fiels position out of range
-    tupleDs.groupBy(5)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupByKeyIndices5(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tupleDs = env.fromCollection(emptyTupleData)
-
-    // should not work, negative field position
-    tupleDs.groupBy(-1)
-  }
-
-  @Test
-  def testGroupByKeyFields1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tupleDs = env.fromCollection(emptyTupleData)
-
-    // should work
-    try {
-      tupleDs.groupBy("_1")
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupByKeyFields2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val longDs = env.fromCollection(emptyLongData)
-
-    // should not work, grouping on basic type
-    longDs.groupBy("_1")
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupByKeyFields3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val customDs = env.fromCollection(customTypeData)
-
-    // should not work, field key on custom type
-    customDs.groupBy("_1")
-  }
-
-  @Test(expected = classOf[RuntimeException])
-  def testGroupByKeyFields4(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tupleDs = env.fromCollection(emptyTupleData)
-
-    // should not work, invalid field
-    tupleDs.groupBy("foo")
-  }
-
-  @Test
-  def testGroupByKeyFields5(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val customDs = env.fromCollection(customTypeData)
-
-    // should not work
-    customDs.groupBy("myInt")
-  }
-
-  @Test
-  def testGroupByKeyExpressions1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromCollection(customTypeData)
-
-    // should work
-    try {
-      ds.groupBy("myInt")
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupByKeyExpressions2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // should not work: groups on basic type
-    val longDs = env.fromCollection(emptyLongData)
-    longDs.groupBy("l")
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testGroupByKeyExpressions3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val customDs = env.fromCollection(customTypeData)
-
-    // should not work: groups on custom type
-    customDs.groupBy(0)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupByKeyExpressions4(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromCollection(customTypeData)
-
-    // should not work, non-existent field
-    ds.groupBy("myNonExistent")
-  }
-
-  @Test
-  def testGroupByKeySelector1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    try {
-      val customDs = env.fromCollection(customTypeData)
-      customDs.groupBy { _.myLong }
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test
-  def testGroupSortKeyFields1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tupleDs = env.fromCollection(emptyTupleData)
-    try {
-      tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING)
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupSortKeyFields2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tupleDs = env.fromCollection(emptyTupleData)
-
-    // should not work, field position out of range
-    tupleDs.groupBy(0).sortGroup(5, Order.ASCENDING)
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testGroupSortKeyFields3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val longDs = env.fromCollection(emptyLongData)
-    longDs.groupBy { x: Long => x } .sortGroup(0, Order.ASCENDING)
-  }
-
-  @Test
-  def testChainedGroupSortKeyFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tupleDs = env.fromCollection(emptyTupleData)
-    try {
-      tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING).sortGroup(2, Order.DESCENDING)
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
deleted file mode 100644
index 2605830..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.common.functions.RichJoinFunction
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.apache.flink.util.Collector
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.flink.api.scala._
-
-
-object JoinProgs {
-  var NUM_PROGRAMS: Int = 20
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * UDF Join on tuples with key field positions
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val joinDs = ds1.join(ds2).where(1).equalTo(1) { (l, r) => (l._3, r._4) }
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-      
-      case 2 =>
-        /*
-         * UDF Join on tuples with multiple key field positions
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.get3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val joinDs = ds1.join(ds2).where(0, 1).equalTo(0, 4) { (l, r) => (l._3, r._4) }
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-          "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-
-      case 3 =>
-        /*
-         * Default Join on tuples
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val joinDs = ds1.join(ds2).where(0).equalTo(2)
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" + "(2,2,Hello),(2,3,2,Hallo Welt wie," +
-          "1)\n" + "(3,2,Hello world),(3,4,3,Hallo Welt wie gehts?,2)\n"
-
-      case 4 =>
-        /*
-         * Join with Huge
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val joinDs = ds1.joinWithHuge(ds2).where(1).equalTo(1) { (l, r) => (l._3, r._4) }
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-
-      case 5 =>
-        /*
-         * Join with Tiny
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val joinDs = ds1.joinWithTiny(ds2).where(1).equalTo(1) { (l, r) => (l._3, r._4) }
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-
-      case 6 =>
-        /*
-         * Join that returns the left input object
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val joinDs = ds1.join(ds2).where(1).equalTo(1) { (l, r) => l }
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n"
-
-      case 7 =>
-        /*
-         * Join that returns the right input object
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val joinDs = ds1.join(ds2).where(1).equalTo(1) { (l, r) => r }
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt,2\n" + "2,2,1,Hallo Welt,2\n"
-
-      case 8 =>
-        /*
-         * Join with broadcast set
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val intDs = CollectionDataSets.getIntDataSet(env)
-        val ds1 = CollectionDataSets.get3TupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
-        val joinDs = ds1.join(ds2).where(1).equalTo(4).apply(
-          new RichJoinFunction[
-            (Int, Long, String),
-            (Int, Long, Int, String, Long),
-            (String, String, Int)] {
-            private var broadcast = 41
-
-            override def open(config: Configuration) {
-              val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
-              broadcast = ints.sum
-            }
-
-            override def join(
-                first: (Int, Long, String),
-                second: (Int, Long, Int, String, Long)): (String, String, Int) = {
-              (first._3, second. _4, broadcast)
-            }
-          }
-        ).withBroadcastSet(intDs, "ints")
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "Hi,Hallo,55\n" + "Hi,Hallo Welt wie,55\n" + "Hello,Hallo Welt," +
-          "55\n" + "Hello world,Hallo Welt,55\n"
-
-      case 9 =>
-        /*
-         * Join on a tuple input with key field selector and a custom type input with key extractor
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env)
-        val ds2 = CollectionDataSets.get3TupleDataSet(env)
-        val joinDs = ds1.join(ds2).where( _.myInt ).equalTo(0) { (l, r) => (l.myString, r._3) }
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "Hi,Hi\n" + "Hello,Hello\n" + "Hello world,Hello\n"
-
-      case 10 => // 12 in Java ITCase
-        /*
-         * Join on a tuple input with key field selector and a custom type input with key extractor
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
-        val joinDs = ds1.join(ds2).where(1).equalTo(_.myLong) apply { (l, r) => (l._3, r.myString) }
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "Hi,Hello\n" + "Hello,Hello world\n" + "Hello world,Hello world\n"
-
-      case 11 => // 13 in Java ITCase
-        /*
-         * (Default) Join on two custom type inputs with key extractors
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getCustomTypeDataSet(env)
-        val ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env)
-        val joinDs = ds1.join(ds2).where(_.myInt).equalTo(_.myInt)
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,0,Hi,1,0,Hi\n" + "2,1,Hello,2,1,Hello\n" + "2,1,Hello,2,2,Hello world\n" + "2," +
-          "2,Hello world,2,1,Hello\n" + "2,2,Hello world,2,2,Hello world\n"
-
-      case 12 => // 14 in Java ITCase
-        /*
-         * UDF Join on tuples with tuple-returning key selectors
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.get3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val joinDs = ds1.join(ds2).where( t => (t._1, t._2)).equalTo( t => (t._1, t._5)) apply {
-          (l, r) => (l._3, r._4)
-        }
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-          "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-
-      /**
-       * Joins with POJOs
-       */
-      case 13 => // 15 in Java ITCase
-        /*
-         * Join nested pojo against tuple (selected using a string)
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val joinDs = ds1.join(ds2).where("nestedPojo.longNumber").equalTo("_7")
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One," +
-          "10000)\n" + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two," +
-          "20000)\n" + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"
-
-      case 14 => // 16 in Java ITCase
-        /*
-         * Join nested pojo against tuple (selected as an integer)
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val joinDs = ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6) // <-- difference
-        joinDs.writeAsCsv(resultPath)
-        env.execute()
-        "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One," +
-          "10000)\n" + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two," +
-          "20000)\n" + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"
-
-      case 15 => // 17 in Java ITCase
-        /*
-         * selecting multiple fields using expression language
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val joinDs = ds1.join(ds2)
-          .where("nestedPojo.longNumber", "number", "str")
-          .equalTo("_7", "_1", "_2")
-        joinDs.writeAsCsv(resultPath)
-        env.setDegreeOfParallelism(1)
-        env.execute()
-        "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One," +
-          "10000)\n" + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two," +
-          "20000)\n" + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"
-
-      case 16 => // 18 in Java ITCase
-        /*
-         * nested into tuple
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val joinDs = ds1.join(ds2).where("nestedPojo.longNumber", "number",
-          "nestedTupleWithCustom._1").equalTo("_7", "_1", "_3")
-        joinDs.writeAsCsv(resultPath)
-        env.setDegreeOfParallelism(1)
-        env.execute()
-        "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One," +
-          "10000)\n" + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two," +
-          "20000)\n" + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"
-
-      case 17 => // 19 in Java ITCase
-        /*
-         * nested into tuple into pojo
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val joinDs = ds1.join(ds2)
-          .where("nestedTupleWithCustom._1",
-            "nestedTupleWithCustom._2.myInt",
-            "nestedTupleWithCustom._2.myLong")
-          .equalTo("_3", "_4", "_5")
-        joinDs.writeAsCsv(resultPath)
-        env.setDegreeOfParallelism(1)
-        env.execute()
-        "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One," +
-          "10000)\n" + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two," +
-          "20000)\n" + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"
-
-      case 18 => // 20 in Java ITCase
-        /*
-         * Non-POJO test to verify that full-tuple keys are working.
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env)
-        val joinDs = ds1.join(ds2).where(0).equalTo("_1._1", "_1._2")
-        joinDs.writeAsCsv(resultPath)
-        env.setDegreeOfParallelism(1)
-        env.execute()
-        "((1,1),one),((1,1),one)\n" + "((2,2),two),((2,2),two)\n" + "((3,3),three),((3,3)," +
-          "three)\n"
-
-      case 19 => // 21 in Java ITCase
-        /*
-         * Non-POJO test to verify "nested" tuple-element selection.
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env)
-        val ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env)
-        val joinDs = ds1.join(ds2).where("_1._1").equalTo("_1._1")
-        joinDs.writeAsCsv(resultPath)
-        env.setDegreeOfParallelism(1)
-        env.execute()
-        "((1,1),one),((1,1),one)\n" + "((2,2),two),((2,2),two)\n" + "((3,3),three),((3,3),three)\n"
-
-      case 20 => // 22 in Java ITCase
-        /*
-         * full pojo with full tuple
-         */
-        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env)
-        val joinDs = ds1.join(ds2).where("*").equalTo("*")
-        joinDs.writeAsCsv(resultPath)
-        env.setDegreeOfParallelism(1)
-        env.execute()
-        "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n" +
-        "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n" +
-        "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n"
-      
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id: " + progId)
-    }
-  }
-}
-
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
-
-  protected def testProgram(): Unit = {
-    expectedResult = JoinProgs.runProgram(curProgId, resultPath)
-  }
-
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
-
-object JoinITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to JoinProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
deleted file mode 100644
index 0219154..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException
-import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
-import org.junit.Assert
-import org.apache.flink.api.common.InvalidProgramException
-import org.junit.Ignore
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-class JoinOperatorTest {
-
-  private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
-  private val customTypeData = Array[CustomType](new CustomType())
-  private val emptyLongData = Array[Long]()
-
-  @Test
-  def testJoinKeyIndices1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should work
-    try {
-      ds1.join(ds2).where(0).equalTo(0)
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testJoinKeyIndices2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should not work, incompatible key types
-    ds1.join(ds2).where(0).equalTo(2)
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testJoinKeyIndices3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should not work, non-matching number of key indices
-    ds1.join(ds2).where(0, 1).equalTo(2)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testJoinKeyIndices4(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should not work, index out of range
-    ds1.join(ds2).where(5).equalTo(0)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testJoinKeyIndices5(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should not work, negative position
-    ds1.join(ds2).where(-1).equalTo(-1)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testJoinKeyIndices6(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should not work, key index on custom type
-    ds1.join(ds2).where(5).equalTo(0)
-  }
-
-  @Test
-  def testJoinKeyFields1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should work
-    try {
-      ds1.join(ds2).where("_1").equalTo("_1")
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testJoinKeyFields2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should not work, incompatible field types
-    ds1.join(ds2).where("_1").equalTo("_3")
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testJoinKeyFields3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should not work, non-matching number of key indices
-
-    ds1.join(ds2).where("_1", "_2").equalTo("_3")
-  }
-
-  @Test(expected = classOf[RuntimeException])
-  def testJoinKeyFields4(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should not work, non-existent key
-    ds1.join(ds2).where("foo").equalTo("_1")
-  }
-
-  @Test(expected = classOf[RuntimeException])
-  def testJoinKeyFields5(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should not work, non-matching number of key indices
-    ds1.join(ds2).where("_1").equalTo("bar")
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testJoinKeyFields6(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should not work, field key on custom type
-    ds1.join(ds2).where("_2").equalTo("_1")
-  }
-
-  @Test
-  def testJoinKeyExpressions1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should work
-    try {
-      ds1.join(ds2).where("myInt").equalTo("myInt")
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testJoinKeyExpressions2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should not work, incompatible join key types
-    ds1.join(ds2).where("myInt").equalTo("myString")
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testJoinKeyExpressions3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should not work, incompatible number of keys
-    ds1.join(ds2).where("myInt", "myString").equalTo("myInt")
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testJoinKeyExpressions4(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should not work, join key non-existent
-    ds1.join(ds2).where("myNonExistent").equalTo("i")
-  }
-
-  @Test
-  def testJoinKeySelectors1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should work
-    try {
-      ds1.join(ds2).where { _.myLong} equalTo { _.myLong }
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test
-  def testJoinKeyMixing1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // should work
-    try {
-      ds1.join(ds2).where { _.myLong }.equalTo(3)
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test
-  def testJoinKeyMixing2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should work
-    try {
-      ds1.join(ds2).where(3).equalTo { _.myLong }
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testJoinKeyMixing3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should not work, incompatible types
-    ds1.join(ds2).where(2).equalTo { _.myLong }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testJoinKeyMixing4(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should not work, more than one field position key
-    ds1.join(ds2).where(1, 3) equalTo { _.myLong }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
deleted file mode 100644
index 71c6b8b..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.apache.flink.util.Collector
-import org.junit.Assert
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.flink.api.scala._
-
-
-object MapProgs {
-  var NUM_PROGRAMS: Int = 9
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Test identity map with basic type
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getStringDataSet(env)
-        val identityMapDs = ds.map( t => t)
-        identityMapDs.writeAsText(resultPath)
-        env.execute()
-        "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
-          ".\n" + "Luke Skywalker\n" + "Random comment\n" + "LOL\n"
-      
-      case 2 =>
-        /*
-         * Test identity map with a tuple
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val identityMapDs = ds.map( t => t )
-        identityMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-          "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-          "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-          "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-          "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-          "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-      
-      case 3 =>
-        /*
-         * Test type conversion mapper (Custom -> Tuple)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val typeConversionMapDs = ds.map( c => (c.myInt, c.myLong, c.myString) )
-        typeConversionMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,0,Hi\n" + "2,1,Hello\n" + "2,2,Hello world\n" + "3,3,Hello world, " +
-          "how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + "4,6," +
-          "Comment#1\n" + "4,7,Comment#2\n" + "4,8,Comment#3\n" + "4,9,Comment#4\n" + "5,10," +
-          "Comment#5\n" + "5,11,Comment#6\n" + "5,12,Comment#7\n" + "5,13,Comment#8\n" + "5,14," +
-          "Comment#9\n" + "6,15,Comment#10\n" + "6,16,Comment#11\n" + "6,17,Comment#12\n" + "6," +
-          "18,Comment#13\n" + "6,19,Comment#14\n" + "6,20,Comment#15\n"
-      
-      case 4 =>
-        /*
-         * Test type conversion mapper (Tuple -> Basic)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val typeConversionMapDs = ds.map(_._3)
-        typeConversionMapDs.writeAsText(resultPath)
-        env.execute()
-        "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
-          ".\n" + "Luke Skywalker\n" + "Comment#1\n" + "Comment#2\n" + "Comment#3\n" +
-          "Comment#4\n" + "Comment#5\n" + "Comment#6\n" + "Comment#7\n" + "Comment#8\n" +
-          "Comment#9\n" + "Comment#10\n" + "Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
-          "Comment#14\n" + "Comment#15\n"
-      
-      case 5 =>
-        /*
-         * Test mapper on tuple - Increment Integer field, reorder second and third fields
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val tupleMapDs = ds.map( t => (t._1 + 1, t._3, t._2) )
-        tupleMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "2,Hi,1\n" + "3,Hello,2\n" + "4,Hello world,2\n" + "5,Hello world, how are you?," +
-          "3\n" + "6,I am fine.,3\n" + "7,Luke Skywalker,3\n" + "8,Comment#1,4\n" + "9,Comment#2," +
-          "4\n" + "10,Comment#3,4\n" + "11,Comment#4,4\n" + "12,Comment#5,5\n" + "13,Comment#6," +
-          "5\n" + "14,Comment#7,5\n" + "15,Comment#8,5\n" + "16,Comment#9,5\n" + "17,Comment#10," +
-          "6\n" + "18,Comment#11,6\n" + "19,Comment#12,6\n" + "20,Comment#13,6\n" + "21," +
-          "Comment#14,6\n" + "22,Comment#15,6\n"
-      
-      case 6 =>
-        /*
-         * Test mapper on Custom - lowercase myString
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val customMapDs = ds.map { c => c.myString = c.myString.toLowerCase; c }
-        customMapDs.writeAsText(resultPath)
-        env.execute()
-        "1,0,hi\n" + "2,1,hello\n" + "2,2,hello world\n" + "3,3,hello world, " +
-          "how are you?\n" + "3,4,i am fine.\n" + "3,5,luke skywalker\n" + "4,6," +
-          "comment#1\n" + "4,7,comment#2\n" + "4,8,comment#3\n" + "4,9,comment#4\n" + "5,10," +
-          "comment#5\n" + "5,11,comment#6\n" + "5,12,comment#7\n" + "5,13,comment#8\n" + "5,14," +
-          "comment#9\n" + "6,15,comment#10\n" + "6,16,comment#11\n" + "6,17,comment#12\n" + "6," +
-          "18,comment#13\n" + "6,19,comment#14\n" + "6,20,comment#15\n"
-      
-      case 7 =>
-        /*
-         * Test mapper if UDF returns input object - increment first field of a tuple
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env).map {
-          t => MutableTuple3(t._1, t._2, t._3)
-        }
-        val inputObjMapDs = ds.map { t => t._1 = t._1 + 1; t }
-        inputObjMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n" + "5,3,Hello world, " +
-          "how are you?\n" + "6,3,I am fine.\n" + "7,3,Luke Skywalker\n" + "8,4," +
-          "Comment#1\n" + "9,4,Comment#2\n" + "10,4,Comment#3\n" + "11,4,Comment#4\n" + "12,5," +
-          "Comment#5\n" + "13,5,Comment#6\n" + "14,5,Comment#7\n" + "15,5,Comment#8\n" + "16,5," +
-          "Comment#9\n" + "17,6,Comment#10\n" + "18,6,Comment#11\n" + "19,6,Comment#12\n" + "20," +
-          "6,Comment#13\n" + "21,6,Comment#14\n" + "22,6,Comment#15\n"
-      
-      case 8 =>
-        /*
-         * Test map with broadcast set
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ints = CollectionDataSets.getIntDataSet(env)
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val bcMapDs = ds.map(
-          new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
-            var f2Replace = 0
-            override def open(config: Configuration): Unit = {
-              val ints = getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
-              f2Replace = ints.sum
-            }
-            override def map(in: (Int, Long, String)): (Int, Long, String) = {
-              in.copy(_1 = f2Replace)
-            }
-          }).withBroadcastSet(ints, "ints")
-        bcMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "55,1,Hi\n" + "55,2,Hello\n" + "55,2,Hello world\n" + "55,3,Hello world, " +
-          "how are you?\n" + "55,3,I am fine.\n" + "55,3,Luke Skywalker\n" + "55,4," +
-          "Comment#1\n" + "55,4,Comment#2\n" + "55,4,Comment#3\n" + "55,4,Comment#4\n" + "55,5," +
-          "Comment#5\n" + "55,5,Comment#6\n" + "55,5,Comment#7\n" + "55,5,Comment#8\n" + "55,5," +
-          "Comment#9\n" + "55,6,Comment#10\n" + "55,6,Comment#11\n" + "55,6,Comment#12\n" + "55," +
-          "6,Comment#13\n" + "55,6,Comment#14\n" + "55,6,Comment#15\n"
-      
-      case 9 =>
-        /*
-         * Test passing configuration object.
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-        val conf = new Configuration
-        val testKey = "testVariable"
-        val testValue = 666
-        conf.setInteger(testKey, testValue)
-        val bcMapDs = ds.map(
-          new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
-            override def open(config: Configuration): Unit = {
-              val fromConfig = config.getInteger(testKey, -1)
-              Assert.assertEquals(testValue, fromConfig)
-            }
-            override def map(in: (Int, Long, String)): (Int, Long, String) = {
-              in
-            }
-          }).withParameters(conf)
-        bcMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world"
-      
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
-  }
-}
-
-
-@RunWith(classOf[Parameterized])
-class MapITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
-
-  protected def testProgram(): Unit = {
-    expectedResult = MapProgs.runProgram(curProgId, resultPath)
-  }
-
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
-
-object MapITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to MapProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
deleted file mode 100644
index 07d4897..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunction}
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.flink.api.scala._
-
-
-object PartitionProgs {
-  var NUM_PROGRAMS: Int = 7
-
-  def runProgram(progId: Int, resultPath: String, onCollection: Boolean): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Test hash partition by tuple field
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-
-        val unique = ds.partitionByHash(1).mapPartition( _.map(_._2).toSet )
-
-        unique.writeAsText(resultPath)
-        env.execute()
-
-        "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-
-      case 2 =>
-        /*
-         * Test hash partition by key selector
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val unique = ds.partitionByHash( _._2 ).mapPartition( _.map(_._2).toSet )
-
-        unique.writeAsText(resultPath)
-        env.execute()
-        "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-
-      case 3 =>
-        /*
-         * Test forced rebalancing
-         */
-      val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.generateSequence(1, 3000)
-
-        val skewed = ds.filter(_ > 780)
-        val rebalanced = skewed.rebalance()
-
-        val countsInPartition = rebalanced.map( new RichMapFunction[Long, (Int, Long)] {
-          def map(in: Long) = {
-            (getRuntimeContext.getIndexOfThisSubtask, 1)
-          }
-        })
-          .groupBy(0)
-          .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
-          // round counts to mitigate runtime scheduling effects (lazy split assignment)
-          .map { in => (in._1, in._2 / 10) }
-
-        countsInPartition.writeAsText(resultPath)
-        env.execute()
-
-        val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10
-        var result = ""
-        for (i <- 0 until env.getDegreeOfParallelism) {
-          result += "(" + i + "," + numPerPartition + ")\n"
-        }
-        result
-
-      case 4 =>
-        // Verify that mapPartition operation after repartition picks up correct
-        // DOP
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        env.setDegreeOfParallelism(1)
-
-        val unique = ds.partitionByHash(1)
-          .setParallelism(4)
-          .mapPartition( _.map(_._2).toSet )
-
-        unique.writeAsText(resultPath)
-        env.execute()
-
-        "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-
-      case 5 =>
-        // Verify that map operation after repartition picks up correct
-        // DOP
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        env.setDegreeOfParallelism(1)
-
-        val count = ds.partitionByHash(0).setParallelism(4).map(
-          new RichMapFunction[(Int, Long, String), Tuple1[Int]] {
-            var first = true
-            override def map(in: (Int, Long, String)): Tuple1[Int] = {
-              // only output one value with count 1
-              if (first) {
-                first = false
-                Tuple1(1)
-              } else {
-                Tuple1(0)
-              }
-            }
-          }).sum(0)
-
-        count.writeAsText(resultPath)
-        env.execute()
-
-        if (onCollection) "(1)\n" else "(4)\n"
-
-      case 6 =>
-        // Verify that filter operation after repartition picks up correct
-        // DOP
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        env.setDegreeOfParallelism(1)
-
-        val count = ds.partitionByHash(0).setParallelism(4).filter(
-          new RichFilterFunction[(Int, Long, String)] {
-            var first = true
-            override def filter(in: (Int, Long, String)): Boolean = {
-              // only output one value with count 1
-              if (first) {
-                first = false
-                true
-              } else {
-                false
-              }
-            }
-        })
-          .map( _ => Tuple1(1)).sum(0)
-
-        count.writeAsText(resultPath)
-        env.execute()
-
-        if (onCollection) "(1)\n" else "(4)\n"
-
-      case 7 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        env.setDegreeOfParallelism(3)
-        val ds = CollectionDataSets.getDuplicatePojoDataSet(env)
-        val uniqLongs = ds
-          .partitionByHash("nestedPojo.longNumber")
-          .setParallelism(4)
-          .mapPartition( _.map(_.nestedPojo.longNumber).toSet )
-
-        uniqLongs.writeAsText(resultPath)
-        env.execute()
-        "10000\n" + "20000\n" + "30000\n"
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
-  }
-}
-
-
-@RunWith(classOf[Parameterized])
-class PartitionITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
-
-  protected def testProgram(): Unit = {
-    expectedResult = PartitionProgs.runProgram(curProgId, resultPath, isCollectionExecution)
-  }
-
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
-
-object PartitionITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to PartitionProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
deleted file mode 100644
index 867eb81..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.common.functions.RichReduceFunction
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.flink.api.scala._
-
-
-object ReduceProgs {
-  var NUM_PROGRAMS: Int = 10
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Reduce on tuples with key field selector
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val reduceDs = ds.groupBy(1)
-          .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
-
-      case 2 =>
-        /*
-         * Reduce on tuples with multiple key field selectors
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val reduceDs = ds.groupBy(4, 0)
-          .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
-          "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
-          "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
-
-      case 3 =>
-        /*
-         * Reduce on tuples with key extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val reduceDs = ds.groupBy(_._2)
-          .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
-
-      case 4 =>
-        /*
-         * Reduce on custom type with key extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val reduceDs = ds.groupBy(_.myInt)
-          .reduce { (in1, in2) =>
-          in1.myLong += in2.myLong
-          in1.myString = "Hello!"
-          in1
-        }
-        reduceDs.writeAsText(resultPath)
-        env.execute()
-        "1,0,Hi\n" + "2,3,Hello!\n" + "3,12,Hello!\n" + "4,30,Hello!\n" + "5,60," +
-          "Hello!\n" + "6,105,Hello!\n"
-
-      case 5 =>
-        /*
-         * All-reduce for tuple
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val reduceDs =
-          ds.reduce { (in1, in2) => (in1._1 + in2._1, in1._2 + in2._2, "Hello World") }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "231,91,Hello World\n"
-
-      case 6 =>
-        /*
-         * All-reduce for custom types
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val reduceDs = ds
-          .reduce { (in1, in2) =>
-          in1.myInt += in2.myInt
-          in1.myLong += in2.myLong
-          in1.myString = "Hello!"
-          in1
-        }
-        reduceDs.writeAsText(resultPath)
-        env.execute()
-        "91,210,Hello!"
-
-      case 7 =>
-        /*
-         * Reduce with broadcast set
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val intDs = CollectionDataSets.getIntDataSet(env)
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val reduceDs = ds.groupBy(1).reduce(
-          new RichReduceFunction[(Int, Long, String)] {
-            private var f2Replace = ""
-
-            override def open(config: Configuration) {
-              val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
-              f2Replace = ints.sum + ""
-            }
-
-            override def reduce(
-                in1: (Int, Long, String),
-                in2: (Int, Long, String)): (Int, Long, String) = {
-              (in1._1 + in2._1, in1._2, f2Replace)
-            }
-          }).withBroadcastSet(intDs, "ints")
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "5,2,55\n" + "15,3,55\n" + "34,4,55\n" + "65,5,55\n" + "111,6,55\n"
-
-      case 8 =>
-        /*
-         * Reduce with UDF that returns the second input object (check mutable object handling)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env).map (t => MutableTuple3(t._1, t._2, t._3))
-        val reduceDs = ds.groupBy(1).reduce(
-          new RichReduceFunction[MutableTuple3[Int, Long, String]] {
-            override def reduce(
-                in1: MutableTuple3[Int, Long, String],
-                in2: MutableTuple3[Int, Long, String]): MutableTuple3[Int, Long, String] = {
-              in2._1 = in1._1 + in2._1
-              in2._3 = "Hi again!"
-              in2
-            }
-          })
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "5,2,Hi again!\n" + "15,3,Hi again!\n" + "34,4,Hi again!\n" + "65,5," +
-          "Hi again!\n" + "111,6,Hi again!\n"
-
-      case 9 =>
-        /*
-         * Reduce with a Tuple-returning KeySelector
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val reduceDs = ds.groupBy(t => (t._1, t._5))
-          .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
-          "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
-          "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
-
-      case 10 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val reduceDs = ds.groupBy("_5", "_1")
-          .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
-          "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
-          "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
-
-      case id =>
-        throw new IllegalArgumentException(s"Invalid program id $id")
-    }
-  }
-}
-
-
-@RunWith(classOf[Parameterized])
-class ReduceITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
-
-  protected def testProgram(): Unit = {
-    expectedResult = ReduceProgs.runProgram(curProgId, resultPath)
-  }
-
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
-
-object ReduceITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to ReduceProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
deleted file mode 100644
index 488520e..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.flink.api.scala._
-
-
-
-/**
- * These tests are copied from [[AggregateITCase]] replacing calls to aggregate with calls to sum,
- * min, and max
- */
-object SumMinMaxProgs {
-  var NUM_PROGRAMS: Int = 3
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        // Full aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        env.setDegreeOfParallelism(10)
-        //        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-
-        val aggregateDs = ds
-          .sum(0)
-          .andMax(1)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map{ t => (t._1, t._2) }
-
-        aggregateDs.writeAsCsv(resultPath)
-
-        env.execute()
-
-        // return expected result
-        "231,6\n"
-
-      case 2 =>
-        // Grouped aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-
-        val aggregateDs = ds
-          .groupBy(1)
-          .sum(0)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map { t => (t._2, t._1) }
-
-        aggregateDs.writeAsCsv(resultPath)
-
-        env.execute()
-
-        // return expected result
-        "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
-
-      case 3 =>
-        // Nested aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-
-        val aggregateDs = ds
-          .groupBy(1)
-          .min(0)
-          .min(0)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map { t => new Tuple1(t._1) }
-
-        aggregateDs.writeAsCsv(resultPath)
-
-        env.execute()
-
-        // return expected result
-        "1\n"
-
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
-  }
-}
-
-
-@RunWith(classOf[Parameterized])
-class SumMinMaxITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
-
-  protected def testProgram(): Unit = {
-    expectedResult = SumMinMaxProgs.runProgram(curProgId, resultPath)
-  }
-
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
-
-object SumMinMaxITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to SumMinMaxProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
deleted file mode 100644
index bf29b58..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.junit.Assert
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.flink.api.scala._
-
-
-object UnionProgs {
-  var NUM_PROGRAMS: Int = 3
-
-  private final val FULL_TUPLE_3_STRING: String = "1,1,Hi\n" + "2,2,Hello\n" + "3,2," +
-    "Hello world\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3," +
-    "Luke Skywalker\n" + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4," +
-    "Comment#4\n" + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5," +
-    "Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6," +
-    "Comment#12\n" + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Union of 2 Same Data Sets
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
-        unionDs.writeAsCsv(resultPath)
-        env.execute()
-        FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
-
-      case 2 =>
-        /*
-         * Union of 5 same Data Sets, with multiple unions
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val unionDs = ds
-          .union(CollectionDataSets.get3TupleDataSet(env))
-          .union(CollectionDataSets.get3TupleDataSet(env))
-          .union(CollectionDataSets.get3TupleDataSet(env))
-          .union(CollectionDataSets.get3TupleDataSet(env))
-        unionDs.writeAsCsv(resultPath)
-        env.execute()
-        FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
-          FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
-
-      case 3 =>
-        /*
-         * Test on union with empty dataset
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        // Don't know how to make an empty result in an other way than filtering it
-        val empty = CollectionDataSets.get3TupleDataSet(env).filter( t => false )
-        val unionDs = CollectionDataSets.get3TupleDataSet(env).union(empty)
-        unionDs.writeAsCsv(resultPath)
-        env.execute()
-        FULL_TUPLE_3_STRING
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
-  }
-}
-
-
-@RunWith(classOf[Parameterized])
-class UnionITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
-
-  protected def testProgram(): Unit = {
-    expectedResult = UnionProgs.runProgram(curProgId, resultPath)
-  }
-
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
-
-object UnionITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to UnionProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
deleted file mode 100644
index c4d7dc8..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators.translation
-
-import org.apache.flink.api.common.Plan
-import org.apache.flink.api.common.operators.GenericDataSourceBase
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
-
-import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.scala._
-import org.junit.Assert.{assertEquals, assertTrue, fail}
-import org.junit.Test
-
-class AggregateTranslationTest {
-  @Test
-  def translateAggregate(): Unit =  {
-    try {
-      val DOP = 8
-
-      val env = ExecutionEnvironment.createLocalEnvironment(DOP)
-
-      val initialData = env.fromElements((3.141592, "foobar", 77L))
-
-      initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).print()
-
-      val p: Plan = env.createProgramPlan()
-      val sink = p.getDataSinks.iterator.next
-
-      val reducer= sink.getInput.asInstanceOf[GroupReduceOperatorBase[_, _, _]]
-
-      assertEquals(1, reducer.getKeyColumns(0).length)
-      assertEquals(0, reducer.getKeyColumns(0)(0))
-      assertEquals(-1, reducer.getDegreeOfParallelism)
-      assertTrue(reducer.isCombinable)
-      assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Test caused an error: " + e.getMessage)
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
deleted file mode 100644
index 4a60154..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators.translation
-
-import org.apache.flink.api.common.functions.{RichCoGroupFunction, RichMapFunction,
-RichJoinFunction}
-import org.apache.flink.api.common.operators.GenericDataSinkBase
-import org.apache.flink.api.java.operators.translation.WrappingFunction
-import org.junit.Assert.assertArrayEquals
-import org.junit.Assert.assertEquals
-import org.junit.Assert.fail
-import org.apache.flink.api.common.{InvalidProgramException, Plan}
-import org.apache.flink.api.common.aggregators.LongSumAggregator
-import org.apache.flink.api.common.operators.base.DeltaIterationBase
-import org.apache.flink.api.common.operators.base.JoinOperatorBase
-import org.apache.flink.api.common.operators.base.MapOperatorBase
-import org.junit.Test
-
-import org.apache.flink.util.Collector
-
-import org.apache.flink.api.scala._
-
-class DeltaIterationTranslationTest {
-
-  @Test
-  def testCorrectTranslation(): Unit = {
-    try {
-      val JOB_NAME = "Test JobName"
-      val ITERATION_NAME = "Test Name"
-      val BEFORE_NEXT_WORKSET_MAP = "Some Mapper"
-      val AGGREGATOR_NAME = "AggregatorName"
-      val ITERATION_KEYS = Array(2)
-      val NUM_ITERATIONS = 13
-      val DEFAULT_DOP = 133
-      val ITERATION_DOP = 77
-
-      val env = ExecutionEnvironment.getExecutionEnvironment
-      env.setDegreeOfParallelism(DEFAULT_DOP)
-
-      val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
-      val initialWorkSet = env.fromElements((1.23, "abc"))
-
-      val result = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS) {
-        (s, ws) =>
-          val wsSelfJoin = ws.map(new IdentityMapper[(Double, String)]())
-            .join(ws).where(1).equalTo(1) { (l, r) => l }
-
-          val joined = wsSelfJoin.join(s).where(1).equalTo(2).apply(new SolutionWorksetJoin)
-          (joined, joined.map(new NextWorksetMapper).name(BEFORE_NEXT_WORKSET_MAP))
-      }
-      result.name(ITERATION_NAME)
-        .setParallelism(ITERATION_DOP)
-        .registerAggregator(AGGREGATOR_NAME, new LongSumAggregator)
-
-      result.print()
-      result.writeAsText("/dev/null")
-
-      val p: Plan = env.createProgramPlan(JOB_NAME)
-      assertEquals(JOB_NAME, p.getJobName)
-      assertEquals(DEFAULT_DOP, p.getDefaultParallelism)
-      var sink1: GenericDataSinkBase[_] = null
-      var sink2: GenericDataSinkBase[_] = null
-      val sinks = p.getDataSinks.iterator
-      sink1 = sinks.next
-      sink2 = sinks.next
-
-      val iteration: DeltaIterationBase[_, _] =
-        sink1.getInput.asInstanceOf[DeltaIterationBase[_,_]]
-
-      assertEquals(iteration, sink2.getInput)
-      assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations)
-      assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields)
-      assertEquals(ITERATION_DOP, iteration.getDegreeOfParallelism)
-      assertEquals(ITERATION_NAME, iteration.getName)
-
-      val nextWorksetMapper: MapOperatorBase[_, _, _] =
-        iteration.getNextWorkset.asInstanceOf[MapOperatorBase[_, _, _]]
-      val solutionSetJoin: JoinOperatorBase[_, _, _, _] =
-        iteration.getSolutionSetDelta.asInstanceOf[JoinOperatorBase[_, _, _, _]]
-      val worksetSelfJoin: JoinOperatorBase[_, _, _, _] =
-        solutionSetJoin.getFirstInput.asInstanceOf[JoinOperatorBase[_, _, _, _]]
-      val worksetMapper: MapOperatorBase[_, _, _] =
-        worksetSelfJoin.getFirstInput.asInstanceOf[MapOperatorBase[_, _, _]]
-
-      assertEquals(classOf[IdentityMapper[_]], worksetMapper.getUserCodeWrapper.getUserCodeClass)
-
-
-      assertEquals(
-        classOf[NextWorksetMapper],
-        nextWorksetMapper.getUserCodeWrapper.getUserCodeClass)
-
-
-      if (solutionSetJoin.getUserCodeWrapper.getUserCodeObject.isInstanceOf[WrappingFunction[_]]) {
-        val wf: WrappingFunction[_] = solutionSetJoin.getUserCodeWrapper.getUserCodeObject
-          .asInstanceOf[WrappingFunction[_]]
-        assertEquals(classOf[SolutionWorksetJoin],
-          wf.getWrappedFunction.getClass)
-      }
-      else {
-        assertEquals(classOf[SolutionWorksetJoin],
-          solutionSetJoin.getUserCodeWrapper.getUserCodeClass)
-      }
-
-      assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName)
-      assertEquals(AGGREGATOR_NAME, iteration.getAggregators.getAllRegisteredAggregators.iterator
-        .next.getName)
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail(e.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def testRejectWhenSolutionSetKeysDontMatchJoin(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
-      val initialWorkSet = env.fromElements((1.23, "abc"))
-
-      val iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, Array(0)) {
-        (s, ws) =>
-          try {
-            ws.join(s).where(1).equalTo(2)
-            fail("Accepted invalid program.")
-          } catch {
-            case e: InvalidProgramException => // all good
-          }
-          try {
-            s.join(ws).where(2).equalTo(1)
-            fail("Accepted invalid program.")
-          } catch {
-            case e: InvalidProgramException => // all good
-          }
-          (s, ws)
-      }
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail(e.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def testRejectWhenSolutionSetKeysDontMatchCoGroup(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
-      val initialWorkSet = env.fromElements((1.23, "abc"))
-
-      val iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, Array(0)) {
-        (s, ws) =>
-          try {
-            ws.coGroup(s).where(1).equalTo(2)
-            fail("Accepted invalid program.")
-          } catch {
-            case e: InvalidProgramException => // all good
-          }
-          try {
-            s.coGroup(ws).where(2).equalTo(1)
-            fail("Accepted invalid program.")
-          } catch {
-            case e: InvalidProgramException => // all good
-          }
-          (s, ws)
-      }
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail(e.getMessage)
-      }
-    }
-  }
-//
-//  @Test def testRejectWhenSolutionSetKeysDontMatchCoGroup {
-//    try {
-//      val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-//      @SuppressWarnings(Array("unchecked")) val initialSolutionSet: DataSet[Tuple3[Double, Long,
-//        String]] = env.fromElements(new Tuple3[Double, Long, String](3.44, 5L, "abc"))
-//      @SuppressWarnings(Array("unchecked")) val initialWorkSet: DataSet[Tuple2[Double,
-//        String]] = env.fromElements(new Tuple2[Double, String](1.23, "abc"))
-//      val iteration: DeltaIteration[Tuple3[Double, Long, String], Tuple2[Double,
-//        String]] = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1)
-//      try {
-//        iteration.getWorkset.coGroup(iteration.getSolutionSet).where(1).equalTo(2).`with`(
-// new DeltaIterationTranslationTest.SolutionWorksetCoGroup1)
-//        fail("Accepted invalid program.")
-//      }
-//      catch {
-//        case e: InvalidProgramException => {
-//        }
-//      }
-//      try {
-//        iteration.getSolutionSet.coGroup(iteration.getWorkset).where(2).equalTo(1).`with`(
-// new DeltaIterationTranslationTest.SolutionWorksetCoGroup2)
-//        fail("Accepted invalid program.")
-//      }
-//      catch {
-//        case e: InvalidProgramException => {
-//        }
-//      }
-//    }
-//    catch {
-//      case e: Exception => {
-//        System.err.println(e.getMessage)
-//        e.printStackTrace
-//        fail(e.getMessage)
-//      }
-//    }
-//  }
-}
-
-class SolutionWorksetJoin
-  extends RichJoinFunction[(Double, String), (Double, Long, String), (Double, Long, String)] {
-  def join(first: (Double, String), second: (Double, Long, String)): (Double, Long, String) = {
-    null
-  }
-}
-
-class NextWorksetMapper extends RichMapFunction[(Double, Long, String), (Double, String)] {
-  def map(value: (Double, Long, String)): (Double, String) = {
-    null
-  }
-}
-
-class IdentityMapper[T] extends RichMapFunction[T, T] {
-  def map(value: T): T = {
-    value
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
deleted file mode 100644
index 931d34d..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators.translation
-
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
-import org.junit.Assert
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-class DistinctTranslationTest {
-  @Test
-  def testCombinable(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-      val input = env.fromElements("1", "2", "1", "3")
-
-      val op = input.distinct { x => x}
-      op.print()
-
-      val p = env.createProgramPlan()
-
-      val reduceOp =
-        p.getDataSinks.iterator.next.getInput.asInstanceOf[GroupReduceOperatorBase[_, _, _]]
-
-      Assert.assertTrue(reduceOp.isCombinable)
-    }
-    catch {
-      case e: Exception => {
-        e.printStackTrace()
-        Assert.fail(e.getMessage)
-      }
-    }
-  }
-}
-


Mime
View raw message