flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [11/14] git commit: [FLINK-1171] Move Scala API tests to flink-tests project
Date Sat, 18 Oct 2014 17:34:16 GMT
[FLINK-1171] Move Scala API tests to flink-tests project

This closes #159


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ce2936ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ce2936ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ce2936ab

Branch: refs/heads/release-0.7
Commit: ce2936ab1e864fbf9d8974b9cc83d0b33385f8bf
Parents: 5b98ceb
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Oct 17 16:46:45 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Sat Oct 18 19:32:50 2014 +0200

----------------------------------------------------------------------
 flink-scala/pom.xml                             |  23 -
 .../scala/DeltaIterationSanityCheckTest.scala   | 166 ----
 .../api/scala/ScalaAPICompletenessTest.scala    | 170 -----
 .../SemanticPropertiesTranslationTest.scala     | 194 -----
 .../scala/io/CollectionInputFormatTest.scala    | 133 ----
 .../flink/api/scala/io/CsvInputFormatTest.scala | 288 -------
 .../api/scala/operators/AggregateITCase.scala   | 141 ----
 .../scala/operators/AggregateOperatorTest.scala | 133 ----
 .../api/scala/operators/CoGroupITCase.scala     | 402 ----------
 .../scala/operators/CoGroupOperatorTest.scala   | 273 -------
 .../flink/api/scala/operators/CrossITCase.scala | 239 ------
 .../api/scala/operators/DistinctITCase.scala    | 191 -----
 .../scala/operators/DistinctOperatorTest.scala  | 154 ----
 .../api/scala/operators/ExamplesITCase.scala    | 164 ----
 .../api/scala/operators/FilterITCase.scala      | 173 -----
 .../api/scala/operators/FirstNITCase.scala      | 116 ---
 .../scala/operators/FirstNOperatorTest.scala    | 168 -----
 .../api/scala/operators/FlatMapITCase.scala     | 219 ------
 .../api/scala/operators/GroupReduceITCase.scala | 748 -------------------
 .../api/scala/operators/GroupingTest.scala      | 228 ------
 .../flink/api/scala/operators/JoinITCase.scala  | 376 ----------
 .../api/scala/operators/JoinOperatorTest.scala  | 276 -------
 .../flink/api/scala/operators/MapITCase.scala   | 242 ------
 .../api/scala/operators/PartitionITCase.scala   | 218 ------
 .../api/scala/operators/ReduceITCase.scala      | 236 ------
 .../api/scala/operators/SumMinMaxITCase.scala   | 146 ----
 .../flink/api/scala/operators/UnionITCase.scala | 126 ----
 .../translation/AggregateTranslationTest.scala  |  61 --
 .../DeltaIterationTranslationTest.scala         | 254 -------
 .../translation/DistinctTranslationTest.scala   |  51 --
 .../translation/ReduceTranslationTest.scala     | 138 ----
 .../runtime/GenericPairComparatorTest.scala     |  76 --
 .../scala/runtime/TupleComparatorILD2Test.scala |  56 --
 .../scala/runtime/TupleComparatorILD3Test.scala |  54 --
 .../runtime/TupleComparatorILDC3Test.scala      |  54 --
 .../runtime/TupleComparatorILDX1Test.scala      |  54 --
 .../runtime/TupleComparatorILDXC2Test.scala     |  54 --
 .../scala/runtime/TupleComparatorISD1Test.scala |  53 --
 .../scala/runtime/TupleComparatorISD2Test.scala |  53 --
 .../scala/runtime/TupleComparatorISD3Test.scala |  53 --
 .../api/scala/runtime/TupleSerializerTest.scala | 190 -----
 .../runtime/TupleSerializerTestInstance.scala   |  90 ---
 .../tuple/base/PairComparatorTestBase.scala     | 102 ---
 .../tuple/base/TupleComparatorTestBase.scala    |  31 -
 .../scala/types/TypeInformationGenTest.scala    | 261 -------
 .../api/scala/util/CollectionDataSets.scala     | 426 -----------
 flink-tests/pom.xml                             | 170 ++++-
 .../scala/DeltaIterationSanityCheckTest.scala   | 166 ++++
 .../api/scala/ScalaAPICompletenessTest.scala    | 170 +++++
 .../SemanticPropertiesTranslationTest.scala     | 194 +++++
 .../scala/io/CollectionInputFormatTest.scala    | 133 ++++
 .../flink/api/scala/io/CsvInputFormatTest.scala | 288 +++++++
 .../api/scala/operators/AggregateITCase.scala   | 141 ++++
 .../scala/operators/AggregateOperatorTest.scala | 133 ++++
 .../api/scala/operators/CoGroupITCase.scala     | 402 ++++++++++
 .../scala/operators/CoGroupOperatorTest.scala   | 273 +++++++
 .../flink/api/scala/operators/CrossITCase.scala | 239 ++++++
 .../api/scala/operators/DistinctITCase.scala    | 191 +++++
 .../scala/operators/DistinctOperatorTest.scala  | 154 ++++
 .../api/scala/operators/ExamplesITCase.scala    | 164 ++++
 .../api/scala/operators/FilterITCase.scala      | 173 +++++
 .../api/scala/operators/FirstNITCase.scala      | 116 +++
 .../scala/operators/FirstNOperatorTest.scala    | 168 +++++
 .../api/scala/operators/FlatMapITCase.scala     | 219 ++++++
 .../api/scala/operators/GroupReduceITCase.scala | 748 +++++++++++++++++++
 .../api/scala/operators/GroupingTest.scala      | 228 ++++++
 .../flink/api/scala/operators/JoinITCase.scala  | 376 ++++++++++
 .../api/scala/operators/JoinOperatorTest.scala  | 276 +++++++
 .../flink/api/scala/operators/MapITCase.scala   | 242 ++++++
 .../api/scala/operators/PartitionITCase.scala   | 218 ++++++
 .../api/scala/operators/ReduceITCase.scala      | 236 ++++++
 .../api/scala/operators/SumMinMaxITCase.scala   | 146 ++++
 .../flink/api/scala/operators/UnionITCase.scala | 126 ++++
 .../translation/AggregateTranslationTest.scala  |  61 ++
 .../DeltaIterationTranslationTest.scala         | 254 +++++++
 .../translation/DistinctTranslationTest.scala   |  51 ++
 .../translation/ReduceTranslationTest.scala     | 138 ++++
 .../runtime/GenericPairComparatorTest.scala     |  76 ++
 .../scala/runtime/TupleComparatorILD2Test.scala |  56 ++
 .../scala/runtime/TupleComparatorILD3Test.scala |  54 ++
 .../runtime/TupleComparatorILDC3Test.scala      |  54 ++
 .../runtime/TupleComparatorILDX1Test.scala      |  54 ++
 .../runtime/TupleComparatorILDXC2Test.scala     |  54 ++
 .../scala/runtime/TupleComparatorISD1Test.scala |  53 ++
 .../scala/runtime/TupleComparatorISD2Test.scala |  53 ++
 .../scala/runtime/TupleComparatorISD3Test.scala |  53 ++
 .../api/scala/runtime/TupleSerializerTest.scala | 190 +++++
 .../runtime/TupleSerializerTestInstance.scala   |  90 +++
 .../tuple/base/PairComparatorTestBase.scala     | 102 +++
 .../tuple/base/TupleComparatorTestBase.scala    |  31 +
 .../scala/types/TypeInformationGenTest.scala    | 261 +++++++
 .../api/scala/util/CollectionDataSets.scala     | 426 +++++++++++
 92 files changed, 8194 insertions(+), 8061 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 827c7f0..4878cba 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -41,22 +41,6 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
 		</dependency>
@@ -100,13 +84,6 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.scalatest</groupId>
-			<artifactId>scalatest_2.10</artifactId>
-			<version>2.2.0</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
 			<version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
deleted file mode 100644
index 729fda4..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import org.junit.Test
-import org.apache.flink.api.common.InvalidProgramException
-
-// Verify that the sanity checking in delta iterations works. We just
-// have a dummy job that is not meant to be executed. Only verify that
-// the join/coGroup inside the iteration is checked.
-class DeltaIterationSanityCheckTest extends Serializable {
-
-  @Test
-  def testCorrectJoinWithSolution1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = s.join(ws).where("_1").equalTo("_1") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.print()
-  }
-
-  @Test
-  def testCorrectJoinWithSolution2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.print()
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectJoinWithSolution1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = s.join(ws).where("_2").equalTo("_2") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.print()
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectJoinWithSolution2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = ws.join(s).where("_2").equalTo("_2") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.print()  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectJoinWithSolution3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
-      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.print()
-   }
-
-  @Test
-  def testCorrectCoGroupWithSolution1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = s.coGroup(ws).where("_1").equalTo("_1") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.print()
-  }
-
-  @Test
-  def testCorrectCoGroupWithSolution2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.print()
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectCoGroupWithSolution1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = s.coGroup(ws).where("_2").equalTo("_2") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.print()
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectCoGroupWithSolution2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = ws.coGroup(s).where("_2").equalTo("_2") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.print()  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectCoGroupWithSolution3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
-      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.print()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
deleted file mode 100644
index 1aad4d1..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala
-
-import java.lang.reflect.Method
-
-import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-
-import scala.language.existentials
-
-import org.junit.Assert._
-import org.junit.Test
-
-/**
- * This checks whether the Scala API is up to feature parity with the Java API. Right now is very
- * simple, it is only checked whether a method with the same name exists.
- *
- * When adding excluded methods to the lists you should give a good reason in a comment.
- *
- * Note: This is inspired by the JavaAPICompletenessChecker from Spark.
- */
-class ScalaAPICompletenessTest {
-
-  private def isExcludedByName(method: Method): Boolean = {
-    val name = method.getDeclaringClass.getName + "." + method.getName
-    val excludedNames = Seq(
-      // These are only used internally. Should be internal API but Java doesn't have
-      // private[flink].
-      "org.apache.flink.api.java.DataSet.getExecutionEnvironment",
-      "org.apache.flink.api.java.DataSet.getType",
-      "org.apache.flink.api.java.operators.Operator.getResultType",
-      "org.apache.flink.api.java.operators.Operator.getName",
-      "org.apache.flink.api.java.operators.Grouping.getDataSet",
-      "org.apache.flink.api.java.operators.Grouping.getKeys",
-      "org.apache.flink.api.java.operators.SingleInputOperator.getInput",
-      "org.apache.flink.api.java.operators.SingleInputOperator.getInputType",
-      "org.apache.flink.api.java.operators.TwoInputOperator.getInput1",
-      "org.apache.flink.api.java.operators.TwoInputOperator.getInput2",
-      "org.apache.flink.api.java.operators.TwoInputOperator.getInput1Type",
-      "org.apache.flink.api.java.operators.TwoInputOperator.getInput2Type",
-      "org.apache.flink.api.java.ExecutionEnvironment.localExecutionIsAllowed",
-      "org.apache.flink.api.java.ExecutionEnvironment.setDefaultLocalParallelism",
-
-      // This is really just a mapper, which in Scala can easily expressed as a map lambda
-      "org.apache.flink.api.java.DataSet.writeAsFormattedText",
-
-      // Exclude minBy and maxBy for now, since there is some discussion about our aggregator
-      // semantics
-      "org.apache.flink.api.java.DataSet.minBy",
-      "org.apache.flink.api.java.DataSet.maxBy",
-      "org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
-      "org.apache.flink.api.java.operators.UnsortedGrouping.maxBy"
-      
-    )
-    val excludedPatterns = Seq(
-      // We don't have project on tuples in the Scala API
-      """^org\.apache\.flink\.api.java.*project""",
-
-      // I don't want to have withParameters in the API since I consider Configuration to be
-      // deprecated. But maybe thats just me ...
-      """^org\.apache\.flink\.api.java.*withParameters""",
-
-      // These are only used internally. Should be internal API but Java doesn't have
-      // private[flink].
-      """^org\.apache\.flink\.api.java.*getBroadcastSets""",
-      """^org\.apache\.flink\.api.java.*setSemanticProperties""",
-      """^org\.apache\.flink\.api.java.*getSemanticProperties""",
-      """^org\.apache\.flink\.api.java.*getParameters""",
-
-      // Commented out for now until we have a use case for this.
-      """^org\.apache\.flink\.api.java.*runOperation""",
-
-      // Object methods
-      """^.*notify""",
-      """^.*wait""",
-      """^.*notifyAll""",
-      """^.*equals""",
-      """^.*toString""",
-      """^.*getClass""",
-      """^.*hashCode"""
-    ).map(_.r)
-    lazy val excludedByPattern =
-      excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).nonEmpty
-    name.contains("$") || excludedNames.contains(name) || excludedByPattern
-  }
-
-  private def isExcludedByInterface(method: Method): Boolean = {
-    val excludedInterfaces =
-      Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
-    def toComparisionKey(method: Method) =
-      (method.getReturnType, method.getName, method.getGenericReturnType)
-    val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
-      excludedInterfaces.contains(i.getName)
-    }
-    val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey))
-    excludedMethods.contains(toComparisionKey(method))
-  }
-
-  private def checkMethods(
-      javaClassName: String,
-      scalaClassName: String,
-      javaClass: Class[_],
-      scalaClass: Class[_]) {
-    val javaMethods = javaClass.getMethods
-      .filterNot(_.isAccessible)
-      .filterNot(isExcludedByName)
-      .filterNot(isExcludedByInterface)
-      .map(m => m.getName).toSet
-
-    val scalaMethods = scalaClass.getMethods
-      .filterNot(_.isAccessible)
-      .filterNot(isExcludedByName)
-      .filterNot(isExcludedByInterface)
-      .map(m => m.getName).toSet
-
-    val missingMethods = javaMethods -- scalaMethods
-
-    for (method <- missingMethods) {
-      fail("Method " + method + " from " + javaClass + " is missing from " + scalaClassName + ".")
-    }
-  }
-
-  @Test
-  def testCompleteness(): Unit = {
-    checkMethods("DataSet", "DataSet", classOf[JavaDataSet[_]], classOf[DataSet[_]])
-
-    checkMethods(
-      "ExecutionEnvironment", "ExecutionEnvironment",
-      classOf[org.apache.flink.api.java.ExecutionEnvironment], classOf[ExecutionEnvironment])
-
-    checkMethods("Operator", "DataSet", classOf[Operator[_, _]], classOf[DataSet[_]])
-
-    checkMethods("UnsortedGrouping", "GroupedDataSet",
-      classOf[UnsortedGrouping[_]], classOf[GroupedDataSet[_]])
-
-    checkMethods("SortedGrouping", "GroupedDataSet",
-      classOf[SortedGrouping[_]], classOf[GroupedDataSet[_]])
-
-    checkMethods("AggregateOperator", "AggregateDataSet",
-      classOf[AggregateOperator[_]], classOf[AggregateDataSet[_]])
-
-    checkMethods("SingleInputOperator", "DataSet",
-      classOf[SingleInputOperator[_, _, _]], classOf[DataSet[_]])
-
-    checkMethods("TwoInputOperator", "DataSet",
-      classOf[TwoInputOperator[_, _, _, _]], classOf[DataSet[_]])
-
-    checkMethods("SingleInputUdfOperator", "DataSet",
-      classOf[SingleInputUdfOperator[_, _, _]], classOf[DataSet[_]])
-
-    checkMethods("TwoInputUdfOperator", "DataSet",
-      classOf[TwoInputUdfOperator[_, _, _, _]], classOf[DataSet[_]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
deleted file mode 100644
index 543c05a..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.functions
-
-import org.junit.Assert._
-import org.apache.flink.api.common.functions.RichJoinFunction
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.operators.{GenericDataSinkBase, SingleInputSemanticProperties}
-import org.apache.flink.api.common.operators.base.{JoinOperatorBase, MapOperatorBase}
-import org.apache.flink.api.common.operators.util.FieldSet
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-/**
- * This is a minimal test to verify that semantic annotations are evaluated against
- * the type information properly translated correctly to the common data flow API.
- *
- * This covers only the constant fields annotations currently !!!
- */
-class SemanticPropertiesTranslationTest {
-  /**
-   * A mapper that preserves all fields over a tuple data set.
-   */
-  @Test
-  def translateUnaryFunctionAnnotationTuplesWildCard(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val input = env.fromElements((3L, "test", 42))
-      input.map(new WildcardConstantMapper[(Long, String, Int)]).print()
-
-      val plan = env.createProgramPlan()
-
-      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
-      val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
-
-      val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
-      val fw1: FieldSet = semantics.getForwardedField(0)
-      val fw2: FieldSet = semantics.getForwardedField(1)
-      val fw3: FieldSet = semantics.getForwardedField(2)
-
-      assertNotNull(fw1)
-      assertNotNull(fw2)
-      assertNotNull(fw3)
-      assertTrue(fw1.contains(0))
-      assertTrue(fw2.contains(1))
-      assertTrue(fw3.contains(2))
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-
-  /**
-   * A mapper that preserves fields 0, 1, 2 of a tuple data set.
-   */
-  @Test
-  def translateUnaryFunctionAnnotationTuples(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val input = env.fromElements((3L, "test", 42))
-      input.map(new IndividualConstantMapper[Long, String, Int]).print()
-
-      val plan = env.createProgramPlan()
-
-      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
-      val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
-
-      val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
-      val fw1: FieldSet = semantics.getForwardedField(0)
-      val fw2: FieldSet = semantics.getForwardedField(1)
-      val fw3: FieldSet = semantics.getForwardedField(2)
-
-      assertNotNull(fw1)
-      assertNotNull(fw2)
-      assertNotNull(fw3)
-      assertTrue(fw1.contains(0))
-      assertTrue(fw2.contains(1))
-      assertTrue(fw3.contains(2))
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-
-  /**
-   * A join that preserves tuple fields from both sides.
-   */
-  @Test
-  def translateBinaryFunctionAnnotationTuples(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val input1 = env.fromElements((3L, "test"))
-      val input2 = env.fromElements((3L, 3.1415))
-
-      input1.join(input2).where(0).equalTo(0)(
-        new ForwardingTupleJoin[Long, String, Long, Double]).print()
-
-      val plan = env.createProgramPlan()
-      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
-      val join: JoinOperatorBase[_, _, _, _] =
-        sink.getInput.asInstanceOf[JoinOperatorBase[_, _, _, _]]
-
-      val semantics = join.getSemanticProperties
-      val fw11: FieldSet = semantics.getForwardedField1(0)
-      val fw12: FieldSet = semantics.getForwardedField1(1)
-      val fw21: FieldSet = semantics.getForwardedField2(0)
-      val fw22: FieldSet = semantics.getForwardedField2(1)
-
-      assertNull(fw11)
-      assertNull(fw21)
-      assertNotNull(fw12)
-      assertNotNull(fw22)
-      assertTrue(fw12.contains(0))
-      assertTrue(fw22.contains(1))
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-}
-
-
-@ConstantFields(Array("*"))
-class WildcardConstantMapper[T] extends RichMapFunction[T, T] {
-  def map(value: T): T = {
-    value
-  }
-}
-
-@ConstantFields(Array("0->0;1->1;2->2"))
-class IndividualConstantMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] {
-  def map(value: (X, Y, Z)): (X, Y, Z) = {
-    value
-  }
-}
-
-@ConstantFields(Array("0"))
-class ZeroConstantMapper[T] extends RichMapFunction[T, T] {
-  def map(value: T): T = {
-    value
-  }
-}
-
-@ConstantFieldsFirst(Array("1 -> 0"))
-@ConstantFieldsSecond(Array("1 -> 1"))
-class ForwardingTupleJoin[A, B, C, D] extends RichJoinFunction[(A, B),  (C, D), (B, D)] {
-  def join(first: (A, B), second: (C, D)): (B, D) = {
-    (first._2, second._2)
-  }
-}
-
-@ConstantFieldsFirst(Array("0 -> 0"))
-@ConstantFieldsSecond(Array("0 -> 1"))
-class ForwardingBasicJoin[A, B] extends RichJoinFunction[A, B, (A, B)] {
-  def join(first: A, second: B): (A, B) = {
-    (first, second)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
deleted file mode 100644
index a81dc45..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.io
-
-import org.apache.flink.api.java.io.CollectionInputFormat
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertNotNull
-import org.junit.Assert.assertTrue
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.core.io.GenericInputSplit
-import org.junit.Test
-import java.io.ByteArrayInputStream
-import java.io.ByteArrayOutputStream
-import java.io.ObjectInputStream
-import java.io.ObjectOutputStream
-import org.apache.flink.api.scala._
-import scala.collection.JavaConverters._
-
-class ElementType(val id: Int) {
-  def this() {
-    this(-1)
-  }
-
-  override def equals(obj: Any): Boolean = {
-    if (obj != null && obj.isInstanceOf[ElementType]) {
-      val et = obj.asInstanceOf[ElementType]
-      et.id == this.id
-    }
-    else {
-      false
-    }
-  }
-}
-
-class CollectionInputFormatTest {
-
-  @Test
-  def testSerializability(): Unit = {
-
-    val inputCollection = Seq(new ElementType(1), new ElementType(2), new ElementType(3))
-    val info = createTypeInformation[ElementType]
-
-    val inputFormat: CollectionInputFormat[ElementType] = new
-        CollectionInputFormat[ElementType](inputCollection.asJava, info.createSerializer())
-
-    val buffer = new ByteArrayOutputStream
-    val out = new ObjectOutputStream(buffer)
-
-    out.writeObject(inputFormat)
-
-    val in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray))
-    val serializationResult: AnyRef = in.readObject
-
-    assertNotNull(serializationResult)
-    assertTrue(serializationResult.isInstanceOf[CollectionInputFormat[_]])
-
-    val result = serializationResult.asInstanceOf[CollectionInputFormat[ElementType]]
-    val inputSplit = new GenericInputSplit
-    inputFormat.open(inputSplit)
-    result.open(inputSplit)
-
-    while (!inputFormat.reachedEnd && !result.reachedEnd) {
-      val expectedElement = inputFormat.nextRecord(null)
-      val actualElement = result.nextRecord(null)
-      assertEquals(expectedElement, actualElement)
-    }
-  }
-
-  @Test
-  def testSerializabilityStrings(): Unit = {
-    val data = Seq("To bey or not to be,--that is the question:--",
-      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
-      "Or to take arms against a sea of troubles,", "And by opposing end them?--To die," +
-        "--to sleep,--", "No more; and by a sleep to say we end", "The heartache, " +
-        "and the thousand natural shocks", "That flesh is heir to,--'tis a consummation",
-      "Devoutly to be wish'd. To die,--to sleep;--", "To sleep! perchance to dream:--ay, " +
-        "there's the rub;", "For in that sleep of death what dreams may come,",
-      "When we have shuffled off this mortal coil,", "Must give us pause: there's the respect",
-      "That makes calamity of so long life;", "For who would bear the whips and scorns of time,",
-      "The oppressor's wrong, the proud man's contumely,", "The pangs of despis'd love, " +
-        "the law's delay,", "The insolence of office, and the spurns",
-      "That patient merit of the unworthy takes,", "When he himself might his quietus make",
-      "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary " +
-        "life,", "But that the dread of something after death,--", "The undiscover'd country, " +
-        "from whose bourn", "No traveller returns,--puzzles the will,",
-      "And makes us rather bear those ills we have", "Than fly to others that we know not of?",
-      "Thus conscience does make cowards of us all;", "And thus the native hue of resolution",
-      "Is sicklied o'er with the pale cast of thought;", "And enterprises of great pith and " +
-        "moment,", "With this regard, their currents turn awry,", "And lose the name of action" +
-        ".--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons",
-      "Be all my sins remember'd.")
-
-    val inputFormat = new CollectionInputFormat[String](
-      data.asJava,
-      BasicTypeInfo.STRING_TYPE_INFO.createSerializer)
-    val baos = new ByteArrayOutputStream
-    val oos = new ObjectOutputStream(baos)
-
-    oos.writeObject(inputFormat)
-    oos.close()
-
-    val bais = new ByteArrayInputStream(baos.toByteArray)
-    val ois = new ObjectInputStream(bais)
-    val result: AnyRef = ois.readObject
-
-    assertTrue(result.isInstanceOf[CollectionInputFormat[_]])
-    var i: Int = 0
-    val in = result.asInstanceOf[CollectionInputFormat[String]]
-    in.open(new GenericInputSplit)
-
-    while (!in.reachedEnd) {
-      assertEquals(data(i), in.nextRecord(""))
-      i += 1
-    }
-    assertEquals(data.length, i)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
deleted file mode 100644
index 9c90788..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.io
-
-import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertNotNull
-import org.junit.Assert.assertNull
-import org.junit.Assert.assertTrue
-import org.junit.Assert.fail
-import java.io.File
-import java.io.FileOutputStream
-import java.io.FileWriter
-import java.io.OutputStreamWriter
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.FileInputSplit
-import org.apache.flink.core.fs.Path
-import org.junit.Test
-import org.apache.flink.api.scala._
-
-class CsvInputFormatTest {
-
-  private final val PATH: Path = new Path("an/ignored/file/")
-  private final val FIRST_PART: String = "That is the first part"
-  private final val SECOND_PART: String = "That is the second part"
-
-  @Test
-  def readStringFields():Unit = {
-    try {
-      val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
-      val split = createTempFile(fileContent)
-      val format = new ScalaCsvInputFormat[(String, String, String)](
-        PATH, createTypeInformation[(String, String, String)])
-      format.setDelimiter("\n")
-      format.setFieldDelimiter('|')
-      val parameters = new Configuration
-      format.configure(parameters)
-      format.open(split)
-      var result: (String, String, String) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("def", result._2)
-      assertEquals("ghijk", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("", result._2)
-      assertEquals("hhg", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("", result._1)
-      assertEquals("", result._2)
-      assertEquals("", result._3)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception => {
-        ex.printStackTrace()
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def readStringFieldsWithTrailingDelimiters(): Unit = {
-    try {
-      val fileContent = "abc|def|ghijk\nabc||hhg\n|||\n"
-      val split = createTempFile(fileContent)
-      val format = new ScalaCsvInputFormat[(String, String, String)](
-        PATH, createTypeInformation[(String, String, String)])
-      format.setDelimiter("\n")
-      format.setFieldDelimiter('|')
-      val parameters = new Configuration
-      format.configure(parameters)
-      format.open(split)
-      var result: (String, String, String) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("def", result._2)
-      assertEquals("ghijk", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("", result._2)
-      assertEquals("hhg", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("", result._1)
-      assertEquals("", result._2)
-      assertEquals("", result._3)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception =>
-        ex.printStackTrace()
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-    }
-  }
-
-  @Test
-  def testIntegerFields(): Unit = {
-    try {
-      val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
-      val split = createTempFile(fileContent)
-      val format = new ScalaCsvInputFormat[(Int, Int, Int, Int, Int)](
-        PATH, createTypeInformation[(Int, Int, Int, Int, Int)])
-      format.setFieldDelimiter('|')
-      format.configure(new Configuration)
-      format.open(split)
-      var result: (Int, Int, Int, Int, Int) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(111), result._1)
-      assertEquals(Integer.valueOf(222), result._2)
-      assertEquals(Integer.valueOf(333), result._3)
-      assertEquals(Integer.valueOf(444), result._4)
-      assertEquals(Integer.valueOf(555), result._5)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(666), result._1)
-      assertEquals(Integer.valueOf(777), result._2)
-      assertEquals(Integer.valueOf(888), result._3)
-      assertEquals(Integer.valueOf(999), result._4)
-      assertEquals(Integer.valueOf(000), result._5)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception =>
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-    }
-  }
-
-  @Test
-  def testReadFirstN(): Unit = {
-    try {
-      val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"
-      val split = createTempFile(fileContent)
-      val format = new ScalaCsvInputFormat[(Int, Int)](PATH, createTypeInformation[(Int, Int)])
-      format.setFieldDelimiter('|')
-      format.configure(new Configuration)
-      format.open(split)
-      var result: (Int, Int) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(111), result._1)
-      assertEquals(Integer.valueOf(222), result._2)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(666), result._1)
-      assertEquals(Integer.valueOf(777), result._2)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception =>
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-    }
-  }
-
-  @Test
-  def testReadSparseWithPositionSetter(): Unit = {
-    try {
-      val fileContent: String = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666" +
-        "|555|444|333|222|111|"
-      val split = createTempFile(fileContent)
-      val format = new ScalaCsvInputFormat[(Int, Int, Int)](
-        PATH,
-        createTypeInformation[(Int, Int, Int)])
-      format.setFieldDelimiter('|')
-      format.setFields(Array(0, 3, 7), Array(classOf[Integer], classOf[Integer], classOf[Integer]))
-      format.configure(new Configuration)
-      format.open(split)
-      var result: (Int, Int, Int) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(111), result._1)
-      assertEquals(Integer.valueOf(444), result._2)
-      assertEquals(Integer.valueOf(888), result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(000), result._1)
-      assertEquals(Integer.valueOf(777), result._2)
-      assertEquals(Integer.valueOf(333), result._3)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception =>
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-    }
-  }
-
-  @Test
-  def testReadSparseWithShuffledPositions(): Unit = {
-    try {
-      val format = new ScalaCsvInputFormat[(Int, Int, Int)](
-        PATH,
-        createTypeInformation[(Int, Int, Int)])
-      format.setFieldDelimiter('|')
-      try {
-        format.setFields(Array(8, 1, 3), Array(classOf[Integer],classOf[Integer],classOf[Integer]))
-        fail("Input sequence should have been rejected.")
-      }
-      catch {
-        case e: IllegalArgumentException => // ignore
-      }
-    }
-    catch {
-      case ex: Exception =>
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-    }
-  }
-
-  private def createTempFile(content: String): FileInputSplit = {
-    val tempFile = File.createTempFile("test_contents", "tmp")
-    tempFile.deleteOnExit()
-    val wrt = new FileWriter(tempFile)
-    wrt.write(content)
-    wrt.close()
-    new FileInputSplit(0, new Path(tempFile.toURI.toString), 0,
-      tempFile.length,Array[String]("localhost"))
-  }
-
-  @Test
-  def testWindowsLineEndRemoval(): Unit = {
-    this.testRemovingTrailingCR("\n", "\n")
-    this.testRemovingTrailingCR("\r\n", "\r\n")
-    this.testRemovingTrailingCR("\r\n", "\n")
-  }
-
-  private def testRemovingTrailingCR(lineBreakerInFile: String, lineBreakerSetup: String) {
-    var tempFile: File = null
-    val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile
-    try {
-      tempFile = File.createTempFile("CsvInputFormatTest", "tmp")
-      tempFile.deleteOnExit()
-      tempFile.setWritable(true)
-      val wrt = new OutputStreamWriter(new FileOutputStream(tempFile))
-      wrt.write(fileContent)
-      wrt.close()
-      val inputFormat = new ScalaCsvInputFormat[Tuple1[String]](new Path(tempFile.toURI.toString),
-        createTypeInformation[Tuple1[String]])
-      val parameters = new Configuration
-      inputFormat.configure(parameters)
-      inputFormat.setDelimiter(lineBreakerSetup)
-      val splits = inputFormat.createInputSplits(1)
-      inputFormat.open(splits(0))
-      var result = inputFormat.nextRecord(null)
-      assertNotNull("Expecting to not return null", result)
-      assertEquals(FIRST_PART, result._1)
-      result = inputFormat.nextRecord(result)
-      assertNotNull("Expecting to not return null", result)
-      assertEquals(SECOND_PART, result._1)
-    }
-    catch {
-      case t: Throwable =>
-        System.err.println("test failed with exception: " + t.getMessage)
-        t.printStackTrace(System.err)
-        fail("Test erroneous")
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
deleted file mode 100644
index ad22fa6..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.flink.api.scala._
-
-
-object AggregateProgs {
-  var NUM_PROGRAMS: Int = 3
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        // Full aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        env.setDegreeOfParallelism(10)
-//        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-
-        val aggregateDs = ds
-          .aggregate(Aggregations.SUM,0)
-          .and(Aggregations.MAX, 1)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map{ t => (t._1, t._2) }
-
-        aggregateDs.writeAsCsv(resultPath)
-
-        env.execute()
-
-        // return expected result
-        "231,6\n"
-
-      case 2 =>
-        // Grouped aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-
-        val aggregateDs = ds
-          .groupBy(1)
-          .aggregate(Aggregations.SUM, 0)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map { t => (t._2, t._1) }
-
-        aggregateDs.writeAsCsv(resultPath)
-
-        env.execute()
-
-        // return expected result
-        "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
-
-      case 3 =>
-        // Nested aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-
-        val aggregateDs = ds
-          .groupBy(1)
-          .aggregate(Aggregations.MIN, 0)
-          .aggregate(Aggregations.MIN, 0)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map { t => new Tuple1(t._1) }
-
-        aggregateDs.writeAsCsv(resultPath)
-
-        env.execute()
-
-        // return expected result
-        "1\n"
-
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
-  }
-}
-
-
-@RunWith(classOf[Parameterized])
-class AggregateITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
-
-  protected def testProgram(): Unit = {
-    expectedResult = AggregateProgs.runProgram(curProgId, resultPath)
-  }
-
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
-
-object AggregateITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to AggregateProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala
deleted file mode 100644
index 639884b..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.junit.Assert
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-class AggregateOperatorTest {
-
-  private final val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
-  private final val tupleTypeInfo = createTypeInformation[(Int, Long, String, Long, Int)]
-  private final val emptyLongData = Array[Long]()
-
-  @Test
-  def testFieldsAggregate(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val tupleDs = env.fromCollection(emptyTupleData)
-
-    // should work
-    try {
-      tupleDs.aggregate(Aggregations.SUM, 1)
-    } catch {
-      case e: Exception => Assert.fail()
-    }
-
-    // should not work: index out of bounds
-    try {
-      tupleDs.aggregate(Aggregations.SUM, 10)
-      Assert.fail()
-    } catch {
-      case iae: IllegalArgumentException =>
-      case e: Exception => Assert.fail()
-    }
-
-    val longDs = env.fromCollection(emptyLongData)
-
-    // should not work: not applied to tuple DataSet
-    try {
-      longDs.aggregate(Aggregations.MIN, 1)
-      Assert.fail()
-    } catch {
-      case uoe: InvalidProgramException =>
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test
-  def testFieldNamesAggregate(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val tupleDs = env.fromCollection(emptyTupleData)
-
-    // should work
-    try {
-      tupleDs.aggregate(Aggregations.SUM, "_2")
-    } catch {
-      case e: Exception => Assert.fail()
-    }
-
-    // should not work: invalid field
-    try {
-      tupleDs.aggregate(Aggregations.SUM, "foo")
-      Assert.fail()
-    } catch {
-      case iae: IllegalArgumentException =>
-      case e: Exception => Assert.fail()
-    }
-
-    val longDs = env.fromCollection(emptyLongData)
-
-    // should not work: not applied to tuple DataSet
-    try {
-      longDs.aggregate(Aggregations.MIN, "_1")
-      Assert.fail()
-    } catch {
-      case uoe: InvalidProgramException =>
-      case uoe: UnsupportedOperationException =>
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test
-  def testAggregationTypes(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val tupleDs = env.fromCollection(emptyTupleData)
-
-      // should work: multiple aggregates
-      tupleDs.aggregate(Aggregations.SUM, 0).and(Aggregations.MIN, 4)
-
-      // should work: nested aggregates
-      tupleDs.aggregate(Aggregations.MIN, 2).and(Aggregations.SUM, 1)
-
-      // should not work: average on string
-      try {
-        tupleDs.aggregate(Aggregations.SUM, 2)
-        Assert.fail()
-      } catch {
-        case iae: UnsupportedAggregationTypeException =>
-      }
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        Assert.fail(e.getMessage)
-      }
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
deleted file mode 100644
index d4ce3b7..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.common.functions.RichCoGroupFunction
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.apache.flink.util.Collector
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-import org.junit.Assert
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.flink.api.scala._
-
-
-object CoGroupProgs {
-  var NUM_PROGRAMS: Int = 13
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * CoGroup on tuples with key field selector
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-          (first, second) =>
-            var sum = 0
-            var id = 0
-            for (t <- first) {
-              sum += t._3
-              id = t._1
-            }
-            for (t <- second) {
-              sum += t._3
-              id = t._1
-            }
-            (id, sum)
-        }
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,0\n" + "2,6\n" + "3,24\n" + "4,60\n" + "5,120\n"
-
-      case 2 =>
-        /*
-         * CoGroup on two custom type inputs with key extractors
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
-
-        val coGroupDs = ds.coGroup(ds2).where(_.myInt).equalTo(_.myInt) apply {
-          (first, second) =>
-            val o = new CustomType(0, 0, "test")
-            for (c <- first) {
-              o.myInt = c.myInt
-              o.myLong += c.myLong
-            }
-            for (c <- second) {
-              o.myInt = c.myInt
-              o.myLong += c.myLong
-            }
-            o
-        }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" + "6," +
-          "210,test\n"
-
-      case 3 =>
-        /*
-         * check correctness of cogroup if UDF returns left input objects
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val ds2 = CollectionDataSets.get3TupleDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-          (first, second, out: Collector[(Int, Long, String)] ) =>
-            for (t <- first) {
-              if (t._1 < 6) {
-                out.collect(t)
-              }
-            }
-        }
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-          "how are you?\n" + "5,3,I am fine.\n"
-
-      case 4 =>
-        /*
-         * check correctness of cogroup if UDF returns right input objects
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-          (first, second, out: Collector[(Int, Long, Int, String, Long)]) =>
-            for (t <- second) {
-              if (t._1 < 4) {
-                out.collect(t)
-              }
-            }
-        }
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt,2\n" + "2,3,2,Hallo Welt wie,1\n" + "3,4,3," +
-          "Hallo Welt wie gehts?,2\n" + "3,5,4,ABC,2\n" + "3,6,5,BCD,3\n"
-
-      case 5 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val intDs = CollectionDataSets.getIntDataSet(env)
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get5TupleDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).apply(
-          new RichCoGroupFunction[
-            (Int, Long, Int, String, Long),
-            (Int, Long, Int, String, Long),
-            (Int, Int, Int)] {
-            private var broadcast = 41
-
-            override def open(config: Configuration) {
-              val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
-              broadcast = ints.sum
-            }
-
-            override def coGroup(
-                first: java.lang.Iterable[(Int, Long, Int, String, Long)],
-                second: java.lang.Iterable[(Int, Long, Int, String, Long)],
-                out: Collector[(Int, Int, Int)]): Unit = {
-              var sum = 0
-              var id = 0
-              for (t <- first.asScala) {
-                sum += t._3
-                id = t._1
-              }
-              for (t <- second.asScala) {
-                sum += t._3
-                id = t._1
-              }
-              out.collect((id, sum, broadcast))
-            }
-
-        }).withBroadcastSet(intDs, "ints")
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,0,55\n" + "2,6,55\n" + "3,24,55\n" + "4,60,55\n" + "5,120,55\n"
-
-      case 6 =>
-        /*
-         * CoGroup on a tuple input with key field selector and a custom type input with
-         * key extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(2).equalTo(_.myInt) apply {
-          (first, second) =>
-            var sum = 0L
-            var id = 0
-            for (t <- first) {
-              sum += t._1
-              id = t._3
-            }
-            for (t <- second) {
-              sum += t.myLong
-              id = t.myInt
-            }
-            (id, sum, "test")
-        }
-        coGroupDs.writeAsCsv(resultPath)
-        env.execute()
-        "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" + "5," +
-          "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
-          "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
-
-      case 7 =>
-        /*
-         * CoGroup on a tuple input with key field selector and a custom type input with
-         * key extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
-        val coGroupDs = ds2.coGroup(ds).where(_.myInt).equalTo(2) {
-          (first, second) =>
-            var sum = 0L
-            var id = 0
-            for (t <- first) {
-              sum += t.myLong
-              id = t.myInt
-            }
-            for (t <- second) {
-              sum += t._1
-              id = t._3
-            }
-
-            new CustomType(id, sum, "test")
-        }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" + "5," +
-          "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
-          "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
-
-      case 8 =>
-        /*
-         * CoGroup with multiple key fields
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets.get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get3TupleDataSet(env)
-        val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) {
-          (first, second, out: Collector[(Int, Long, String)]) =>
-            val strs = first map(_._4)
-            for (t <- second) {
-              for (s <- strs) {
-                out.collect((t._1, t._2, s))
-              }
-            }
-        }
-
-        coGrouped.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
-          "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
-
-      case 9 =>
-        /*
-         * CoGroup with multiple key fields
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds1 = CollectionDataSets
-          .get5TupleDataSet(env)
-        val ds2 = CollectionDataSets.get3TupleDataSet(env)
-        val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2))
-          .apply {
-          (first, second, out: Collector[(Int, Long, String)]) =>
-            val strs = first map(_._4)
-            for (t <- second) {
-              for (s <- strs) {
-                out.collect((t._1, t._2, s))
-              }
-            }
-        }
-
-        coGrouped.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
-          "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
-
-      case 10 =>
-        /*
-         * CoGroup on two custom type inputs using expression keys
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt") {
-          (first, second) =>
-            val o = new CustomType(0, 0, "test")
-            for (t <- first) {
-              o.myInt = t.myInt
-              o.myLong += t.myLong
-            }
-            for (t <- second) {
-              o.myInt = t.myInt
-              o.myLong += t.myLong
-            }
-            o
-        }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" + "6," +
-          "210,test\n"
-
-      case 11 =>
-        /*
-         * CoGroup on two custom type inputs using expression keys
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) {
-          (first, second, out: Collector[CustomType]) =>
-            for (p <- first) {
-              for (t <- second) {
-                Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-                out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
-              }
-            }
-        }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
-
-      case 12 =>
-        /*
-         * CoGroup field-selector (expression keys) + key selector function
-         * The key selector is unnecessary complicated (Tuple1) ;)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) {
-          (first, second, out: Collector[CustomType]) =>
-            for (p <- first) {
-              for (t <- second) {
-                Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-                out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
-              }
-            }
-        }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
-
-      case 13 =>
-        /*
-         * CoGroup field-selector (expression keys) + key selector function
-         * The key selector is simple here
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
-        val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) {
-          (first, second, out: Collector[CustomType]) =>
-            for (p <- first) {
-              for (t <- second) {
-                Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-                out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
-              }
-            }
-        }
-        coGroupDs.writeAsText(resultPath)
-        env.execute()
-        "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
-  }
-}
-
-
-@RunWith(classOf[Parameterized])
-class CoGroupITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private val curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
-
-  protected def testProgram(): Unit = {
-    expectedResult = CoGroupProgs.runProgram(curProgId, resultPath)
-  }
-
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
-
-object CoGroupITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to CoGroupProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce2936ab/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
deleted file mode 100644
index 3f0ca5f..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators
-
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException
-import org.junit.Assert
-import org.junit.Test
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
-
-class CoGroupOperatorTest {
-
-  private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
-  private var customTypeData = Array[CustomType](new CustomType())
-
-  @Test
-  def testCoGroupKeyFields1(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should work
-    try {
-      ds1.coGroup(ds2).where(0).equalTo(0)
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testCoGroupKeyFields2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should not work, incompatible key types
-    ds1.coGroup(ds2).where(0).equalTo(2)
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testCoGroupKeyFields3(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should not work, incompatible number of key fields
-    ds1.coGroup(ds2).where(0, 1).equalTo(2)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCoGroupKeyFields4(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should not work, field position out of range
-    ds1.coGroup(ds2).where(5).equalTo(0)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCoGroupKeyFields5(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should not work, negative field position
-    ds1.coGroup(ds2).where(-1).equalTo(-1)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCoGroupKeyFields6(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // Should not work, field position key on custom data type
-    ds1.coGroup(ds2).where(5).equalTo(0)
-  }
-
-  @Test
-  def testCoGroupKeyFieldNames1(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should work
-    try {
-      ds1.coGroup(ds2).where("_1").equalTo("_1")
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testCoGroupKeyFieldNames2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should not work, incompatible key types
-    ds1.coGroup(ds2).where("_1").equalTo("_3")
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testCoGroupKeyFieldNames3(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should not work, incompatible number of key fields
-    ds1.coGroup(ds2).where("_1", "_2").equalTo("_3")
-  }
-
-  @Test(expected = classOf[RuntimeException])
-  def testCoGroupKeyFieldNames4(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should not work, invalid field name
-    ds1.coGroup(ds2).where("_6").equalTo("_1")
-  }
-
-  @Test(expected = classOf[RuntimeException])
-  def testCoGroupKeyFieldNames5(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should not work, invalid field name
-    ds1.coGroup(ds2).where("_1").equalTo("bar")
-  }
-
-  @Test(expected = classOf[RuntimeException])
-  def testCoGroupKeyFieldNames6(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // Should not work, field position key on custom data type
-    ds1.coGroup(ds2).where("_3").equalTo("_1")
-  }
-
-  @Test
-  def testCoGroupKeyExpressions1(): Unit =  {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // Should work
-    try {
-//      ds1.coGroup(ds2).where("i").equalTo("i");
-
-    }catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testCoGroupKeyExpressions2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should not work, incompatible key types
-    ds1.coGroup(ds2).where("myInt").equalTo("myString")
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testCoGroupKeyExpressions3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // should not work, incompatible number of keys
-    ds1.coGroup(ds2).where("myInt", "myString").equalTo("myString")
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCoGroupKeyExpressions4(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-
-    // should not work, key non-existent
-    ds1.coGroup(ds2).where("myNonExistent").equalTo("i")
-  }
-
-  @Test
-  def testCoGroupKeySelectors1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // Should work
-    try {
-      ds1.coGroup(ds2).where { _.myLong } equalTo { _.myLong }
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test
-  def testCoGroupKeyMixing1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(customTypeData)
-    val ds2 = env.fromCollection(emptyTupleData)
-
-    // Should work
-    try {
-      ds1.coGroup(ds2).where { _.myLong }.equalTo(3)
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test
-  def testCoGroupKeyMixing2(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // Should work
-    try {
-      ds1.coGroup(ds2).where(3).equalTo { _.myLong }
-    }
-    catch {
-      case e: Exception => Assert.fail()
-    }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testCoGroupKeyMixing3(): Unit =  {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // Should not work, incompatible types
-    ds1.coGroup(ds2).where(2).equalTo { _.myLong }
-  }
-
-  @Test(expected = classOf[IncompatibleKeysException])
-  def testCoGroupKeyMixing4(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = env.fromCollection(emptyTupleData)
-    val ds2 = env.fromCollection(customTypeData)
-
-    // Should not work, more than one field position key
-    ds1.coGroup(ds2).where(1, 3).equalTo { _.myLong }
-  }
-}
-
-


Mime
View raw message