flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [6/9] [FLINK-1171] Move Scala API tests to flink-tests project
Date Sat, 18 Oct 2014 15:17:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
deleted file mode 100644
index 22e4914..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators.translation
-
-import org.apache.flink.api.common.operators.{GenericDataSourceBase, GenericDataSinkBase}
-import org.apache.flink.api.java.operators.translation.{KeyExtractingMapper,
-PlanUnwrappingReduceOperator}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.java.typeutils.TupleTypeInfo
-import org.junit.Assert._
-import org.apache.flink.api.common.operators.base.MapOperatorBase
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-class ReduceTranslationTest {
-  @Test
-  def translateNonGroupedReduce(): Unit = {
-    try {
-      val DOP = 8
-      val env = ExecutionEnvironment.createLocalEnvironment(DOP)
-
-      val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
-
-
-      initialData reduce { (v1, v2) => v1 } print()
-
-      val p = env.createProgramPlan(
-
-)
-      val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
-      val reducer: ReduceOperatorBase[_, _] = sink.getInput.asInstanceOf[ReduceOperatorBase[_, _]]
-
-      assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getInputType)
-      assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getOutputType)
-      assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0)
-      assertTrue(reducer.getDegreeOfParallelism == 1 || reducer.getDegreeOfParallelism == -1)
-      assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Test caused an error: " + e.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def translateGroupedReduceNoMapper(): Unit = {
-    try {
-      val DOP: Int = 8
-      val env = ExecutionEnvironment.createLocalEnvironment(DOP)
-
-      val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
-
-      initialData.groupBy(2) reduce { (v1, v2) => v1 } print()
-
-      val p = env.createProgramPlan()
-
-      val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
-      val reducer: ReduceOperatorBase[_, _] = sink.getInput.asInstanceOf[ReduceOperatorBase[_, _]]
-      assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getInputType)
-      assertEquals(initialData.javaSet.getType, reducer.getOperatorInfo.getOutputType)
-      assertTrue(reducer.getDegreeOfParallelism == DOP || reducer.getDegreeOfParallelism == -1)
-      assertArrayEquals(Array[Int](2), reducer.getKeyColumns(0))
-      assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Test caused an error: " + e.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def translateGroupedReduceWithKeyExtractor(): Unit = {
-    try {
-      val DOP: Int = 8
-      val env = ExecutionEnvironment.createLocalEnvironment(DOP)
-
-      val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
-
-      initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) print()
-
-      val p = env.createProgramPlan()
-      val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
-      val keyProjector: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_,
-        _, _]]
-      val reducer: PlanUnwrappingReduceOperator[_, _] = keyProjector.getInput
-        .asInstanceOf[PlanUnwrappingReduceOperator[_, _]]
-      val keyExtractor: MapOperatorBase[_, _, _] = reducer.getInput
-        .asInstanceOf[MapOperatorBase[_, _, _]]
-      assertEquals(1, keyExtractor.getDegreeOfParallelism)
-      assertEquals(4, reducer.getDegreeOfParallelism)
-      assertEquals(4, keyProjector.getDegreeOfParallelism)
-      val keyValueInfo = new TupleTypeInfo(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        createTypeInformation[(Double, String, Long)])
-      assertEquals(initialData.javaSet.getType, keyExtractor.getOperatorInfo.getInputType)
-      assertEquals(keyValueInfo, keyExtractor.getOperatorInfo.getOutputType)
-      assertEquals(keyValueInfo, reducer.getOperatorInfo.getInputType)
-      assertEquals(keyValueInfo, reducer.getOperatorInfo.getOutputType)
-      assertEquals(keyValueInfo, keyProjector.getOperatorInfo.getInputType)
-      assertEquals(initialData.javaSet.getType, keyProjector.getOperatorInfo.getOutputType)
-      assertEquals(
-        classOf[KeyExtractingMapper[_, _]],
-        keyExtractor.getUserCodeWrapper.getUserCodeClass)
-      assertTrue(keyExtractor.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Test caused an error: " + e.getMessage)
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
deleted file mode 100644
index 9fb0980..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.{GenericPairComparator, TypeComparator, TypeSerializer}
-import org.apache.flink.api.common.typeutils.base.{DoubleComparator, DoubleSerializer, IntComparator, IntSerializer}
-
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator
-import org.apache.flink.api.scala.runtime.tuple.base.PairComparatorTestBase
-import org.apache.flink.api.scala.typeutils.CaseClassComparator
-
-class GenericPairComparatorTest
-  extends PairComparatorTestBase[(Int, String, Double), (Int, Float, Long, Double)] {
-
-  override protected def createComparator(ascending: Boolean):
-  GenericPairComparator[(Int, String, Double), (Int, Float, Long, Double)] = {
-    val fields1  = Array[Int](0, 2)
-    val fields2 = Array[Int](0, 3)
-
-    val comps1 =
-      Array[TypeComparator[_]](new IntComparator(ascending), new DoubleComparator(ascending))
-    val comps2 =
-      Array[TypeComparator[_]](new IntComparator(ascending), new DoubleComparator(ascending))
-
-    val sers1 =
-      Array[TypeSerializer[_]](IntSerializer.INSTANCE, DoubleSerializer.INSTANCE)
-    val sers2 =
-      Array[TypeSerializer[_]](IntSerializer.INSTANCE, DoubleSerializer.INSTANCE)
-
-    val comp1 = new CaseClassComparator[(Int, String, Double)](fields1, comps1, sers1)
-    val comp2 = new CaseClassComparator[(Int, Float, Long, Double)](fields2, comps2, sers2)
-
-    new GenericPairComparator[(Int, String, Double), (Int, Float, Long, Double)](comp1, comp2)
-  }
-
-  protected def getSortedTestData:
-  (Array[(Int, String, Double)], Array[(Int, Float, Long, Double)]) = {
-    (dataISD, dataIDL)
-  }
-
-  private val dataISD = Array(
-    (4, "hello", 20.0),
-    (4, "world", 23.2),
-    (5, "hello", 18.0),
-    (5, "world", 19.2),
-    (6, "hello", 16.0),
-    (6, "world", 17.2),
-    (7, "hello", 14.0),
-    (7,"world", 15.2))
-
-  private val dataIDL = Array(
-    (4, 0.11f, 14L, 20.0),
-    (4, 0.221f, 15L, 23.2),
-    (5, 0.33f, 15L, 18.0),
-    (5, 0.44f, 20L, 19.2),
-    (6, 0.55f, 20L, 16.0),
-    (6, 0.66f, 29L, 17.2),
-    (7, 0.77f, 29L, 14.0),
-    (7, 0.88f, 34L, 15.2))
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
deleted file mode 100644
index adc46ac..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.TypeComparator
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
-
-import org.apache.flink.api.scala._
-
-
-class TupleComparatorILD2Test extends TupleComparatorTestBase[(Int, Long, Double)] {
-
-  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(0, 1), Array(ascending, ascending), 0)
-  }
-
-  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
-  }
-
-  protected def getSortedTestData: Array[(Int, Long, Double)] = {
-    dataISD
-  }
-
-  private val dataISD = Array(
-    (4, 14L, 20.0),
-    (4, 15L, 23.2),
-    (5, 15L, 20.0),
-    (5, 20L, 20.0),
-    (6, 20L, 23.2),
-    (6, 29L, 20.0),
-    (7, 29L, 20.0),
-    (7, 34L, 23.2)
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
deleted file mode 100644
index 377f985..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
-
-
-class TupleComparatorILD3Test extends TupleComparatorTestBase[(Int, Long, Double)] {
-
-  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(0, 1, 2), Array(ascending, ascending, ascending), 0)
-  }
-
-  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
-  }
-
-  protected def getSortedTestData: Array[(Int, Long, Double)] = {
-    dataISD
-  }
-
-  private val dataISD = Array(
-    (4, 4L, 20.0),
-    (4, 4L, 23.2),
-    (4, 9L, 20.0),
-    (5, 4L, 20.0),
-    (5, 4L, 23.2),
-    (5, 9L, 20.0),
-    (6, 4L, 20.0),
-    (6, 4L, 23.2)
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
deleted file mode 100644
index 1578951..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
-
-
-class TupleComparatorILDC3Test extends TupleComparatorTestBase[(Int, Long, Double)] {
-
-  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(2, 0, 1), Array(ascending, ascending, ascending), 0)
-  }
-
-  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
-  }
-
-  protected def getSortedTestData: Array[(Int, Long, Double)] = {
-    dataISD
-  }
-
-  private val dataISD = Array(
-    (4, 4L, 20.0),
-    (5, 1L, 20.0),
-    (5, 2L, 20.0),
-    (5, 10L, 23.0),
-    (5, 19L, 24.0),
-    (5, 20L, 24.0),
-    (5, 24L, 25.0),
-    (5, 25L, 25.0)
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
deleted file mode 100644
index 51e08cc..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
-
-import org.apache.flink.api.scala._
-
-class TupleComparatorILDX1Test extends TupleComparatorTestBase[(Int, Long, Double)] {
-
-  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(1), Array(ascending), 0)
-  }
-
-  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
-  }
-
-  protected def getSortedTestData: Array[(Int, Long, Double)] = {
-    dataISD
-  }
-
-  private val dataISD = Array(
-    (4, 4L, 20.0),
-    (4, 5L, 23.2),
-    (4, 9L, 20.0),
-    (4, 10L, 24.0),
-    (4, 19L, 23.2),
-    (4, 20L, 24.0),
-    (4, 24L, 20.0),
-    (4, 25L, 23.2)
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
deleted file mode 100644
index 3dbaabf..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
-
-import org.apache.flink.api.scala._
-
-class TupleComparatorILDXC2Test extends TupleComparatorTestBase[(Int, Long, Double)] {
-
-  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(2, 1), Array(ascending, ascending), 0)
-  }
-
-  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
-    val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
-  }
-
-  protected def getSortedTestData: Array[(Int, Long, Double)] = {
-    dataISD
-  }
-
-  private val dataISD = Array(
-    (4, 4L, 20.0),
-    (4, 5L, 20.0),
-    (4, 3L, 23.0),
-    (4, 19L, 23.0),
-    (4, 17L, 24.0),
-    (4, 18L, 24.0),
-    (4, 24L, 25.0),
-    (4, 25L, 25.0)
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
deleted file mode 100644
index 252ae79..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
-
-class TupleComparatorISD1Test extends TupleComparatorTestBase[(Int, String, Double)] {
-
-  protected def createComparator(ascending: Boolean): TypeComparator[(Int, String, Double)] = {
-    val ti = createTypeInformation[(Int, String, Double)]
-    ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]]
-      .createComparator(Array(0), Array(ascending),0)
-  }
-
-  protected def createSerializer: TypeSerializer[(Int, String, Double)] = {
-    val ti = createTypeInformation[(Int, String, Double)]
-    ti.createSerializer()
-  }
-
-  protected def getSortedTestData: Array[(Int, String, Double)] = {
-    dataISD
-  }
-
-  private val dataISD = Array(
-    (4, "hello", 20.0),
-    (5, "hello", 23.2),
-    (6, "world", 20.0),
-    (7, "hello", 20.0),
-    (8, "hello", 23.2),
-    (9, "world", 20.0),
-    (10, "hello", 20.0),
-    (11, "hello", 23.2)
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala
deleted file mode 100644
index 37e775e..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
-
-class TupleComparatorISD2Test extends TupleComparatorTestBase[(Int, String, Double)] {
-
-  protected def createComparator(ascending: Boolean): TypeComparator[(Int, String, Double)] = {
-    val ti = createTypeInformation[(Int, String, Double)]
-    ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]]
-      .createComparator(Array(0, 1), Array(ascending, ascending), 0)
-  }
-
-  protected def createSerializer: TypeSerializer[(Int, String, Double)] = {
-    val ti = createTypeInformation[(Int, String, Double)]
-    ti.createSerializer()
-  }
-
-  protected def getSortedTestData: Array[(Int, String, Double)] = {
-    dataISD
-  }
-
-  private val dataISD = Array(
-    (4, "hello", 20.0),
-    (4, "world", 23.2),
-    (5, "hello", 20.0),
-    (5, "world", 20.0),
-    (6, "hello", 23.2),
-    (6, "world", 20.0),
-    (7, "hello", 20.0),
-    (7, "world", 23.2)
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala
deleted file mode 100644
index 227b041..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
-
-class TupleComparatorISD3Test extends TupleComparatorTestBase[(Int, String, Double)] {
-
-  protected def createComparator(ascending: Boolean): TypeComparator[(Int, String, Double)] = {
-    val ti = createTypeInformation[(Int, String, Double)]
-    ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]]
-      .createComparator(Array(0, 1, 2), Array(ascending, ascending, ascending), 0)
-  }
-
-  protected def createSerializer: TypeSerializer[(Int, String, Double)] = {
-    val ti = createTypeInformation[(Int, String, Double)]
-    ti.createSerializer()
-  }
-
-  protected def getSortedTestData: Array[(Int, String, Double)] = {
-    dataISD
-  }
-
-  private val dataISD = Array(
-    (4, "hello", 20.0),
-    (4, "hello", 23.2),
-    (4, "world", 20.0),
-    (5, "hello", 20.0),
-    (5, "hello", 23.2),
-    (5, "world", 20.0),
-    (6, "hello", 20.0),
-    (6, "hello", 23.2)
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
deleted file mode 100644
index a4149dd..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.util.StringUtils
-import org.junit.Assert
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-import scala.collection.JavaConverters._
-
-import java.util.Random
-
-class TupleSerializerTest {
-
-  @Test
-  def testTuple1Int(): Unit = {
-    val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), Tuple1(Int.MaxValue),
-      Tuple1(Int.MinValue))
-    runTests(testTuples)
-  }
-
-  @Test
-  def testTuple1String(): Unit = {
-    val rnd: Random = new Random(68761564135413L)
-    val testTuples = Array(
-      Tuple1(StringUtils.getRandomString(rnd, 10, 100)),
-      Tuple1("abc"),
-      Tuple1(""),
-      Tuple1(StringUtils.getRandomString(rnd, 30, 170)),
-      Tuple1(StringUtils.getRandomString(rnd, 15, 50)),
-      Tuple1(""))
-    runTests(testTuples)
-  }
-
-  @Test
-  def testTuple1StringArray(): Unit = {
-    val rnd: Random = new Random(289347567856686223L)
-
-    val arr1 = Array(
-      "abc",
-      "",
-      StringUtils.getRandomString(rnd, 10, 100),
-      StringUtils.getRandomString(rnd, 15, 50),
-      StringUtils.getRandomString(rnd, 30, 170),
-      StringUtils.getRandomString(rnd, 14, 15),
-      "")
-    val arr2 = Array(
-      "foo",
-      "",
-      StringUtils.getRandomString(rnd, 10, 100),
-      StringUtils.getRandomString(rnd, 1000, 5000),
-      StringUtils.getRandomString(rnd, 30000, 35000),
-      StringUtils.getRandomString(rnd, 100 * 1024, 105 * 1024),
-      "bar")
-    val testTuples = Array(Tuple1(arr1), Tuple1(arr2))
-    runTests(testTuples)
-  }
-
-  @Test
-  def testTuple2StringDouble(): Unit = {
-    val rnd: Random = new Random(807346528946L)
-
-    val testTuples = Array(
-      (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble),
-      (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble),
-      (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble),
-      ("", rnd.nextDouble),
-      (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble),
-      (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble))
-    runTests(testTuples)
-  }
-
-  @Test
-  def testTuple2StringStringArray(): Unit = {
-    val rnd: Random = new Random(289347567856686223L)
-
-    val arr1 = Array(
-      "abc",
-      "",
-      StringUtils.getRandomString(rnd, 10, 100),
-      StringUtils.getRandomString(rnd, 15, 50),
-      StringUtils.getRandomString(rnd, 30, 170),
-      StringUtils.getRandomString(rnd, 14, 15), "")
-    val arr2 = Array(
-      "foo",
-      "",
-      StringUtils.getRandomString(rnd, 10, 100),
-      StringUtils.getRandomString(rnd, 1000, 5000),
-      StringUtils.getRandomString(rnd, 30000, 35000),
-      StringUtils.getRandomString(rnd, 100 * 1024, 105 * 1024),
-      "bar")
-    val testTuples = Array(
-      (StringUtils.getRandomString(rnd, 30, 170), arr1),
-      (StringUtils.getRandomString(rnd, 30, 170), arr2),
-      (StringUtils.getRandomString(rnd, 30, 170), arr1),
-      (StringUtils.getRandomString(rnd, 30, 170), arr2),
-      (StringUtils.getRandomString(rnd, 30, 170), arr2))
-    runTests(testTuples)
-  }
-
-  @Test
-  def testTuple5CustomObjects(): Unit = {
-    val rnd: Random = new Random(807346528946L)
-
-    val a = new SimpleTypes
-    val b =  new SimpleTypes(rnd.nextInt, rnd.nextLong, rnd.nextInt.asInstanceOf[Byte],
-        StringUtils.getRandomString(rnd, 10, 100), rnd.nextInt.asInstanceOf[Short], rnd.nextDouble)
-    val c = new SimpleTypes(rnd.nextInt, rnd.nextLong, rnd.nextInt.asInstanceOf[Byte],
-        StringUtils.getRandomString(rnd, 10, 100), rnd.nextInt.asInstanceOf[Short], rnd.nextDouble)
-    val d = new SimpleTypes(rnd.nextInt, rnd.nextLong, rnd.nextInt.asInstanceOf[Byte],
-        StringUtils.getRandomString(rnd, 10, 100), rnd.nextInt.asInstanceOf[Short], rnd.nextDouble)
-    val e = new SimpleTypes(rnd.nextInt, rnd.nextLong, rnd.nextInt.asInstanceOf[Byte],
-        StringUtils.getRandomString(rnd, 10, 100), rnd.nextInt.asInstanceOf[Short], rnd.nextDouble)
-    val f = new SimpleTypes(rnd.nextInt, rnd.nextLong, rnd.nextInt.asInstanceOf[Byte],
-        StringUtils.getRandomString(rnd, 10, 100), rnd.nextInt.asInstanceOf[Short], rnd.nextDouble)
-    val g = new SimpleTypes(rnd.nextInt, rnd.nextLong, rnd.nextInt.asInstanceOf[Byte],
-        StringUtils.getRandomString(rnd, 10, 100), rnd.nextInt.asInstanceOf[Short], rnd.nextDouble)
-
-    val o1 = new ComplexNestedObject1(5626435)
-    val o2 = new ComplexNestedObject1(76923)
-    val o3 = new ComplexNestedObject1(-1100)
-    val o4 = new ComplexNestedObject1(0)
-    val o5 = new ComplexNestedObject1(44)
-
-    val co1 = new ComplexNestedObject2(rnd)
-    val co2 = new ComplexNestedObject2
-    val co3 = new ComplexNestedObject2(rnd)
-    val co4 = new ComplexNestedObject2(rnd)
-
-    val b1 = new Book(976243875L, "The Serialization Odysse", 42)
-    val b2 = new Book(0L, "Debugging byte streams", 1337)
-    val b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE)
-    val b4 = new Book(Long.MaxValue, "The joy of bits and bytes", 0xDEADBEEF)
-    val b5 = new Book(Long.MaxValue, "Winnign a prize for creative test strings", 0xBADF00)
-    val b6 = new Book(-2L, "Distributed Systems", 0xABCDEF0123456789L)
-
-    val list = List("A", "B", "C", "D", "E")
-    val ba1 = new BookAuthor(976243875L, list.asJava, "Arno Nym")
-
-    val list2 = List[String]()
-    val ba2 = new BookAuthor(987654321L, list2.asJava, "The Saurus")
-
-    val testTuples = Array(
-      (a, b1, o1, ba1, co1),
-      (b, b2, o2, ba2, co2),
-      (c, b3, o3, ba1, co3),
-      (d, b2, o4, ba1, co4),
-      (e, b4, o5, ba2, co4),
-      (f, b5, o1, ba2, co4),
-      (g, b6, o4, ba1, co2))
-    runTests(testTuples)
-  }
-
-  private final def runTests[T <: Product : TypeInformation](instances: Array[T]) {
-    try {
-      val tupleTypeInfo = implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]]
-      val serializer = tupleTypeInfo.createSerializer
-      val tupleClass = tupleTypeInfo.getTypeClass
-      val test = new TupleSerializerTestInstance[T](serializer, tupleClass, -1, instances)
-      test.testAll()
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        Assert.fail(e.getMessage)
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTestInstance.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTestInstance.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTestInstance.scala
deleted file mode 100644
index 3b923bd..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTestInstance.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.junit.Assert._
-import org.apache.flink.api.common.typeutils.SerializerTestInstance
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.junit.Assert
-import org.junit.Test
-
-
-class TupleSerializerTestInstance[T <: Product] (
-    serializer: TypeSerializer[T],
-    typeClass: Class[T],
-    length: Int,
-    testData: Array[T])
-  extends SerializerTestInstance[T](serializer, typeClass, length, testData: _*) {
-
-  @Test
-  override def testInstantiate(): Unit = {
-    try {
-      val serializer: TypeSerializer[T] = getSerializer
-      val instance: T = serializer.createInstance
-      assertNotNull("The created instance must not be null.", instance)
-      val tpe: Class[T] = getTypeClass
-      assertNotNull("The test is corrupt: type class is null.", tpe)
-      // We cannot check this because Tuple1 instances are not actually of type Tuple1
-      // but something like Tuple1$mcI$sp
-//      assertEquals("Type of the instantiated object is wrong.", tpe, instance.getClass)
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-
-  protected override def deepEquals(message: String, shouldTuple: T, isTuple: T) {
-    Assert.assertEquals(shouldTuple.productArity, isTuple.productArity)
-    for (i <- 0 until shouldTuple.productArity) {
-      val should = shouldTuple.productElement(i)
-      val is = isTuple.productElement(i)
-      if (should.getClass.isArray) {
-        should match {
-          case booleans: Array[Boolean] =>
-            Assert.assertTrue(message, booleans.sameElements(is.asInstanceOf[Array[Boolean]]))
-          case bytes: Array[Byte] =>
-            assertArrayEquals(message, bytes, is.asInstanceOf[Array[Byte]])
-          case shorts: Array[Short] =>
-            assertArrayEquals(message, shorts, is.asInstanceOf[Array[Short]])
-          case ints: Array[Int] =>
-            assertArrayEquals(message, ints, is.asInstanceOf[Array[Int]])
-          case longs: Array[Long] =>
-            assertArrayEquals(message, longs, is.asInstanceOf[Array[Long]])
-          case floats: Array[Float] =>
-            assertArrayEquals(message, floats, is.asInstanceOf[Array[Float]], 0.0f)
-          case doubles: Array[Double] =>
-            assertArrayEquals(message, doubles, is.asInstanceOf[Array[Double]], 0.0)
-          case chars: Array[Char] =>
-            assertArrayEquals(message, chars, is.asInstanceOf[Array[Char]])
-          case _ =>
-            assertArrayEquals(
-              message,
-              should.asInstanceOf[Array[AnyRef]],
-              is.asInstanceOf[Array[AnyRef]])
-        }
-      } else {
-        assertEquals(message, should, is)
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
deleted file mode 100644
index 9809dd2..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime.tuple.base
-
-import org.junit.Assert.assertTrue
-import org.junit.Assert.fail
-import org.apache.flink.api.common.typeutils.TypePairComparator
-import org.junit.Test
-
-/**
- * Abstract test base for PairComparators.
- */
-abstract class PairComparatorTestBase[T, R] {
-  protected def createComparator(ascending: Boolean): TypePairComparator[T, R]
-
-  protected def getSortedTestData: (Array[T], Array[R])
-
-  @Test
-  def testEqualityWithReference(): Unit = {
-    try {
-      val comparator = getComparator(true)
-      
-      val (dataT, dataR) = getSortedData
-      for (i <- 0 until dataT.length) {
-        comparator.setReference(dataT(i))
-        assertTrue(comparator.equalToReference(dataR(i)))
-      }
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def testInequalityWithReference(): Unit = {
-    testGreatSmallAscDescWithReference(true)
-    testGreatSmallAscDescWithReference(false)
-  }
-
-  protected def testGreatSmallAscDescWithReference(ascending: Boolean) {
-    try {
-      val (dataT, dataR) = getSortedData
-      val comparator = getComparator(ascending)
-      for (x <- 0 until (dataT.length - 1)) {
-        for (y <- (x + 1) until dataR.length) {
-          comparator.setReference(dataT(x))
-          if (ascending) {
-            assertTrue(comparator.compareToReference(dataR(y)) > 0)
-          }
-          else {
-            assertTrue(comparator.compareToReference(dataR(y)) < 0)
-          }
-        }
-      }
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-
-  protected def getComparator(ascending: Boolean): TypePairComparator[T, R] = {
-    val comparator: TypePairComparator[T, R] = createComparator(ascending)
-    if (comparator == null) {
-      throw new RuntimeException("Test case corrupt. Returns null as comparator.")
-    }
-    comparator
-  }
-
-  protected def getSortedData: (Array[T], Array[R]) = {
-    val (dataT, dataR) = getSortedTestData
-
-    if (dataT == null || dataR == null) {
-      throw new RuntimeException("Test case corrupt. Returns null as test data.")
-    }
-    if (dataT.length < 2 || dataR.length < 2) {
-      throw new RuntimeException("Test case does not provide enough sorted test data.")
-    }
-    (dataT, dataR)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
deleted file mode 100644
index 506b264..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime.tuple.base
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassComparator}
-import org.junit.Assert._
-
-abstract class TupleComparatorTestBase[T <: Product] extends ComparatorTestBase[T] {
-  protected override def deepEquals(message: String, should: T, is: T) {
-    for (i <- 0 until should.productArity) {
-      assertEquals(should.productElement(i), is.productElement(i))
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
deleted file mode 100644
index 2b2d3a9..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.types
-
-import java.io.DataInput
-import java.io.DataOutput
-import org.apache.flink.api.common.typeinfo._
-import org.apache.flink.api.java.typeutils._
-import org.apache.flink.types.{IntValue, StringValue}
-import org.apache.hadoop.io.Writable
-import org.junit.Assert
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-class MyWritable extends Writable {
-  def write(out: DataOutput) {
-  }
-
-  def readFields(in: DataInput) {
-  }
-}
-
-case class CustomCaseClass(a: String, b: Int)
-
-class CustomType(var myField1: String, var myField2: Int) {
-  def this() {
-    this(null, 0)
-  }
-}
-
-class MyObject[A](var a: A) {
-  def this() { this(null.asInstanceOf[A]) }
-}
-
-class TypeInformationGenTest {
-
-  @Test
-  def testBasicType(): Unit = {
-    val ti = createTypeInformation[Boolean]
-
-    Assert.assertTrue(ti.isBasicType)
-    Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti)
-    Assert.assertEquals(classOf[java.lang.Boolean], ti.getTypeClass)
-  }
-
-  @Test
-  def testWritableType(): Unit = {
-    val ti = createTypeInformation[MyWritable]
-
-    Assert.assertTrue(ti.isInstanceOf[WritableTypeInfo[_]])
-    Assert.assertEquals(classOf[MyWritable], ti.asInstanceOf[WritableTypeInfo[_]].getTypeClass)
-  }
-
-  @Test
-  def testTupleWithBasicTypes(): Unit = {
-    val ti = createTypeInformation[(Int, Long, Double, Float, Boolean, String, Char, Short, Byte)]
-
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(9, ti.getArity)
-    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
-    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(classOf[Tuple9[_,_,_,_,_,_,_,_,_]], tti.getTypeClass)
-    for (i <- 0 until 0) {
-      Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
-    }
-
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(0))
-    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1))
-    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tti.getTypeAt(2))
-    Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti.getTypeAt(3))
-    Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(4))
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(5))
-    Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, tti.getTypeAt(6))
-    Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, tti.getTypeAt(7))
-    Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, tti.getTypeAt(8))
-  }
-
-  @Test
-  def testTupleWithTuples(): Unit = {
-    val ti = createTypeInformation[(Tuple1[String], Tuple1[Int], Tuple2[Long, Long])]
-
-    Assert.assertTrue(ti.isTupleType())
-    Assert.assertEquals(3, ti.getArity)
-    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
-    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(classOf[Tuple3[_, _, _]], tti.getTypeClass)
-    Assert.assertTrue(tti.getTypeAt(0).isTupleType())
-    Assert.assertTrue(tti.getTypeAt(1).isTupleType())
-    Assert.assertTrue(tti.getTypeAt(2).isTupleType())
-    Assert.assertEquals(classOf[Tuple1[_]], tti.getTypeAt(0).getTypeClass)
-    Assert.assertEquals(classOf[Tuple1[_]], tti.getTypeAt(1).getTypeClass)
-    Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeAt(2).getTypeClass)
-    Assert.assertEquals(1, tti.getTypeAt(0).getArity)
-    Assert.assertEquals(1, tti.getTypeAt(1).getArity)
-    Assert.assertEquals(2, tti.getTypeAt(2).getArity)
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO,
-      tti.getTypeAt(0).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
-    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO,
-      tti.getTypeAt(1).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
-    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
-      tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
-    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
-      tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
-  }
-
-  @Test
-  def testCaseClass(): Unit = {
-    val ti = createTypeInformation[CustomCaseClass]
-
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(2, ti.getArity)
-    Assert.assertEquals(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
-    Assert.assertEquals(
-      BasicTypeInfo.INT_TYPE_INFO,
-      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
-    Assert.assertEquals(
-      classOf[CustomCaseClass],ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeClass())
-  }
-
-  @Test
-  def testCustomType(): Unit = {
-    val ti = createTypeInformation[CustomType]
-
-    Assert.assertFalse(ti.isBasicType)
-    Assert.assertFalse(ti.isTupleType)
-    Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
-    Assert.assertEquals(ti.getTypeClass, classOf[CustomType])
-  }
-
-  @Test
-  def testTupleWithCustomType(): Unit = {
-    val ti = createTypeInformation[(Long, CustomType)]
-
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(2, ti.getArity)
-    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeClass)
-    Assert.assertEquals(classOf[java.lang.Long], tti.getTypeAt(0).getTypeClass)
-    Assert.assertTrue(tti.getTypeAt(1).isInstanceOf[PojoTypeInfo[_]])
-    Assert.assertEquals(classOf[CustomType], tti.getTypeAt(1).getTypeClass)
-  }
-
-  @Test
-  def testValue(): Unit = {
-    val ti = createTypeInformation[StringValue]
-
-    Assert.assertFalse(ti.isBasicType)
-    Assert.assertFalse(ti.isTupleType)
-    Assert.assertTrue(ti.isInstanceOf[ValueTypeInfo[_]])
-    Assert.assertEquals(ti.getTypeClass, classOf[StringValue])
-    Assert.assertTrue(TypeExtractor.getForClass(classOf[StringValue])
-      .isInstanceOf[ValueTypeInfo[_]])
-    Assert.assertEquals(TypeExtractor.getForClass(classOf[StringValue]).getTypeClass,
-      ti.getTypeClass)
-  }
-
-  @Test
-  def testTupleOfValues(): Unit = {
-    val ti = createTypeInformation[(StringValue, IntValue)]
-    Assert.assertFalse(ti.isBasicType)
-    Assert.assertTrue(ti.isTupleType)
-    Assert.assertEquals(
-      classOf[StringValue],
-      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0).getTypeClass)
-    Assert.assertEquals(
-      classOf[IntValue],
-      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1).getTypeClass)
-  }
-
-
-  @Test
-  def testBasicArray(): Unit = {
-    val ti = createTypeInformation[Array[String]]
-
-    Assert.assertFalse(ti.isBasicType)
-    Assert.assertFalse(ti.isTupleType)
-    Assert.assertTrue(ti.isInstanceOf[BasicArrayTypeInfo[_, _]] ||
-      ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
-    if (ti.isInstanceOf[BasicArrayTypeInfo[_, _]]) {
-      Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, ti)
-    }
-    else {
-      Assert.assertEquals(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo)
-    }
-  }
-
-  @Test
-  def testPrimitiveArray(): Unit = {
-    val ti = createTypeInformation[Array[Boolean]]
-
-    Assert.assertTrue(ti.isInstanceOf[PrimitiveArrayTypeInfo[_]])
-    Assert.assertEquals(ti, PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
-  }
-
-  @Test
-  def testCustomArray(): Unit = {
-    val ti = createTypeInformation[Array[CustomType]]
-    Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
-    Assert.assertEquals(
-      classOf[CustomType],
-      ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentType)
-  }
-
-  @Test
-  def testTupleArray(): Unit = {
-    val ti = createTypeInformation[Array[(String, String)]]
-
-    Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
-    val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
-    Assert.assertTrue(oati.getComponentInfo.isTupleType)
-    val tti = oati.getComponentInfo.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
-    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
-  }
-
-  @Test
-  def testParamertizedCustomObject(): Unit = {
-    val ti = createTypeInformation[MyObject[String]]
-
-    Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
-  }
-
-  @Test
-  def testTupleWithPrimitiveArray(): Unit = {
-    val ti = createTypeInformation[(Array[Int], Array[Double], Array[Long],
-      Array[Byte], Array[Char], Array[Float], Array[Short], Array[Boolean],
-      Array[String])]
-
-    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
-    Assert.assertEquals(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(0))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(1))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(2))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(3))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(4))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(5))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(6))
-    Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(7))
-    Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, tti.getTypeAt(8))
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
deleted file mode 100644
index 09e049b..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.util
-
-import org.apache.hadoop.io.IntWritable
-
-import org.apache.flink.api.scala._
-
-import scala.collection.mutable
-import scala.reflect.classTag
-import scala.util.Random
-
-/**
- * #################################################################################################
- *
- * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. 
- * IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
- *
- * #################################################################################################
- */
-object CollectionDataSets {
-  def get3TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = {
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Hi"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((3, 2L, "Hello world"))
-    data.+=((4, 3L, "Hello world, how are you?"))
-    data.+=((5, 3L, "I am fine."))
-    data.+=((6, 3L, "Luke Skywalker"))
-    data.+=((7, 4L, "Comment#1"))
-    data.+=((8, 4L, "Comment#2"))
-    data.+=((9, 4L, "Comment#3"))
-    data.+=((10, 4L, "Comment#4"))
-    data.+=((11, 5L, "Comment#5"))
-    data.+=((12, 5L, "Comment#6"))
-    data.+=((13, 5L, "Comment#7"))
-    data.+=((14, 5L, "Comment#8"))
-    data.+=((15, 5L, "Comment#9"))
-    data.+=((16, 6L, "Comment#10"))
-    data.+=((17, 6L, "Comment#11"))
-    data.+=((18, 6L, "Comment#12"))
-    data.+=((19, 6L, "Comment#13"))
-    data.+=((20, 6L, "Comment#14"))
-    data.+=((21, 6L, "Comment#15"))
-    Random.shuffle(data)
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getSmall3TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = {
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Hi"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((3, 2L, "Hello world"))
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def get5TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, Int, String, Long)] = {
-    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
-    data.+=((1, 1L, 0, "Hallo", 1L))
-    data.+=((2, 2L, 1, "Hallo Welt", 2L))
-    data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
-    data.+=((3, 4L, 3, "Hallo Welt wie gehts?", 2L))
-    data.+=((3, 5L, 4, "ABC", 2L))
-    data.+=((3, 6L, 5, "BCD", 3L))
-    data.+=((4, 7L, 6, "CDE", 2L))
-    data.+=((4, 8L, 7, "DEF", 1L))
-    data.+=((4, 9L, 8, "EFG", 1L))
-    data.+=((4, 10L, 9, "FGH", 2L))
-    data.+=((5, 11L, 10, "GHI", 1L))
-    data.+=((5, 12L, 11, "HIJ", 3L))
-    data.+=((5, 13L, 12, "IJK", 3L))
-    data.+=((5, 14L, 13, "JKL", 2L))
-    data.+=((5, 15L, 14, "KLM", 2L))
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getSmall5TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, Int, String, Long)] = {
-    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
-    data.+=((1, 1L, 0, "Hallo", 1L))
-    data.+=((2, 2L, 1, "Hallo Welt", 2L))
-    data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getSmallNestedTupleDataSet(env: ExecutionEnvironment): DataSet[((Int, Int), String)] = {
-    val data = new mutable.MutableList[((Int, Int), String)]
-    data.+=(((1, 1), "one"))
-    data.+=(((2, 2), "two"))
-    data.+=(((3, 3), "three"))
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getGroupSortedNestedTupleDataSet(env: ExecutionEnvironment): DataSet[((Int, Int), String)] = {
-    val data = new mutable.MutableList[((Int, Int), String)]
-    data.+=(((1, 3), "a"))
-    data.+=(((1, 2), "a"))
-    data.+=(((2, 1), "a"))
-    data.+=(((2, 2), "b"))
-    data.+=(((3, 3), "c"))
-    data.+=(((3, 6), "c"))
-    data.+=(((4, 9), "c"))
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getStringDataSet(env: ExecutionEnvironment): DataSet[String] = {
-    val data = new mutable.MutableList[String]
-    data.+=("Hi")
-    data.+=("Hello")
-    data.+=("Hello world")
-    data.+=("Hello world, how are you?")
-    data.+=("I am fine.")
-    data.+=("Luke Skywalker")
-    data.+=("Random comment")
-    data.+=("LOL")
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getIntDataSet(env: ExecutionEnvironment): DataSet[Int] = {
-    val data = new mutable.MutableList[Int]
-    data.+=(1)
-    data.+=(2)
-    data.+=(2)
-    data.+=(3)
-    data.+=(3)
-    data.+=(3)
-    data.+=(4)
-    data.+=(4)
-    data.+=(4)
-    data.+=(4)
-    data.+=(5)
-    data.+=(5)
-    data.+=(5)
-    data.+=(5)
-    data.+=(5)
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getCustomTypeDataSet(env: ExecutionEnvironment): DataSet[CustomType] = {
-    val data = new mutable.MutableList[CustomType]
-    data.+=(new CustomType(1, 0L, "Hi"))
-    data.+=(new CustomType(2, 1L, "Hello"))
-    data.+=(new CustomType(2, 2L, "Hello world"))
-    data.+=(new CustomType(3, 3L, "Hello world, how are you?"))
-    data.+=(new CustomType(3, 4L, "I am fine."))
-    data.+=(new CustomType(3, 5L, "Luke Skywalker"))
-    data.+=(new CustomType(4, 6L, "Comment#1"))
-    data.+=(new CustomType(4, 7L, "Comment#2"))
-    data.+=(new CustomType(4, 8L, "Comment#3"))
-    data.+=(new CustomType(4, 9L, "Comment#4"))
-    data.+=(new CustomType(5, 10L, "Comment#5"))
-    data.+=(new CustomType(5, 11L, "Comment#6"))
-    data.+=(new CustomType(5, 12L, "Comment#7"))
-    data.+=(new CustomType(5, 13L, "Comment#8"))
-    data.+=(new CustomType(5, 14L, "Comment#9"))
-    data.+=(new CustomType(6, 15L, "Comment#10"))
-    data.+=(new CustomType(6, 16L, "Comment#11"))
-    data.+=(new CustomType(6, 17L, "Comment#12"))
-    data.+=(new CustomType(6, 18L, "Comment#13"))
-    data.+=(new CustomType(6, 19L, "Comment#14"))
-    data.+=(new CustomType(6, 20L, "Comment#15"))
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getSmallCustomTypeDataSet(env: ExecutionEnvironment): DataSet[CustomType] = {
-    val data = new mutable.MutableList[CustomType]
-    data.+=(new CustomType(1, 0L, "Hi"))
-    data.+=(new CustomType(2, 1L, "Hello"))
-    data.+=(new CustomType(2, 2L, "Hello world"))
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getSmallTuplebasedPojoMatchingDataSet(env: ExecutionEnvironment):
-      DataSet[(Int, String, Int, Int, Long, String, Long)] = {
-    val data = new mutable.MutableList[(Int, String, Int, Int, Long, String, Long)]
-    data.+=((1, "First", 10, 100, 1000L, "One", 10000L))
-    data.+=((2, "Second", 20, 200, 2000L, "Two", 20000L))
-    data.+=((3, "Third", 30, 300, 3000L, "Three", 30000L))
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getSmallPojoDataSet(env: ExecutionEnvironment): DataSet[POJO] = {
-    val data = new mutable.MutableList[POJO]
-    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
-    data.+=(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L))
-    data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L))
-    env.fromCollection(Random.shuffle(data))
-  }
-
-  def getDuplicatePojoDataSet(env: ExecutionEnvironment): DataSet[POJO] = {
-    val data = new mutable.MutableList[POJO]
-    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
-    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
-    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
-    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
-    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
-    data.+=(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L))
-    data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L))
-    data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L))
-    env.fromCollection(data)
-  }
-
-  def getCrazyNestedDataSet(env: ExecutionEnvironment): DataSet[CrazyNested] = {
-    val data = new mutable.MutableList[CrazyNested]
-    data.+=(new CrazyNested("aa"))
-    data.+=(new CrazyNested("bb"))
-    data.+=(new CrazyNested("bb"))
-    data.+=(new CrazyNested("cc"))
-    data.+=(new CrazyNested("cc"))
-    data.+=(new CrazyNested("cc"))
-    env.fromCollection(data)
-  }
-
-  def getPojoContainingTupleAndWritable(env: ExecutionEnvironment): DataSet[CollectionDataSets
-  .PojoContainingTupleAndWritable] = {
-    val data = new
-        mutable.MutableList[PojoContainingTupleAndWritable]
-    data.+=(new PojoContainingTupleAndWritable(1, 10L, 100L))
-    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
-    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
-    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
-    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
-    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
-    env.fromCollection(data)
-  }
-
-  def getGroupSortedPojoContainingTupleAndWritable(env: ExecutionEnvironment):
-  DataSet[PojoContainingTupleAndWritable] = {
-    val data = new mutable.MutableList[PojoContainingTupleAndWritable]
-    data.+=(new CollectionDataSets.PojoContainingTupleAndWritable(1, 10L, 100L))
-    data.+=(new CollectionDataSets.PojoContainingTupleAndWritable(2, 20L, 200L))
-    data.+=(new CollectionDataSets.PojoContainingTupleAndWritable(2, 20L, 201L))
-    data.+=(new CollectionDataSets.PojoContainingTupleAndWritable(2, 30L, 200L))
-    data.+=(new CollectionDataSets.PojoContainingTupleAndWritable(2, 30L, 600L))
-    data.+=(new CollectionDataSets.PojoContainingTupleAndWritable(2, 30L, 400L))
-    env.fromCollection(data)
-  }
-
-  def getTupleContainingPojos(env: ExecutionEnvironment): DataSet[(Int, CrazyNested, POJO)] = {
-    val data = new mutable.MutableList[(Int, CrazyNested, POJO)]
-    data.+=((
-      1,
-      new CrazyNested("one", "uno", 1L),
-      new POJO(1, "First", 10, 100, 1000L, "One", 10000L)))
-    data.+=((
-      1,
-      new CrazyNested("one", "uno", 1L),
-      new POJO(1, "First", 10, 100, 1000L, "One", 10000L)))
-    data.+=((
-      1,
-      new CrazyNested("one", "uno", 1L),
-      new POJO(1, "First", 10, 100, 1000L, "One", 10000L)))
-    data.+=((
-      2,
-      new CrazyNested("two", "duo", 2L),
-      new POJO(1, "First", 10, 100, 1000L, "One", 10000L)))
-    env.fromCollection(data)
-  }
-
-  def getSmallTuplebasedDataSetMatchingPojo(env: ExecutionEnvironment):
-    DataSet[(Long, Integer, Integer, Long, String, Integer, String)] = {
-    val data = new mutable.MutableList[(Long, Integer, Integer, Long, String, Integer, String)]
-    data.+=((10000L, 10, 100, 1000L, "One", 1, "First"))
-    data.+=((20000L, 20, 200, 2000L, "Two", 2, "Second"))
-    data.+=((30000L, 30, 300, 3000L, "Three", 3, "Third"))
-    env.fromCollection(data)
-  }
-
-  def getPojoWithMultiplePojos(env: ExecutionEnvironment): DataSet[CollectionDataSets
-  .PojoWithMultiplePojos] = {
-    val data = new mutable.MutableList[CollectionDataSets
-    .PojoWithMultiplePojos]
-    data.+=(new CollectionDataSets.PojoWithMultiplePojos("a", "aa", "b", "bb", 1))
-    data.+=(new CollectionDataSets.PojoWithMultiplePojos("b", "bb", "c", "cc", 2))
-    data.+=(new CollectionDataSets.PojoWithMultiplePojos("b", "bb", "c", "cc", 2))
-    data.+=(new CollectionDataSets.PojoWithMultiplePojos("b", "bb", "c", "cc", 2))
-    data.+=(new CollectionDataSets.PojoWithMultiplePojos("d", "dd", "e", "ee", 3))
-    data.+=(new CollectionDataSets.PojoWithMultiplePojos("d", "dd", "e", "ee", 3))
-    env.fromCollection(data)
-  }
-
-  case class MutableTuple3[T1, T2, T3](var _1: T1, var _2: T2, var _3: T3)
-
-  class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
-    def this() {
-      this(0, 0, "")
-    }
-
-    override def toString: String = {
-      myInt + "," + myLong + "," + myString
-    }
-  }
-
-  class POJO(
-      var number: Int,
-      var str: String,
-      var nestedTupleWithCustom: (Int, CustomType),
-      var nestedPojo: NestedPojo) {
-    def this() {
-      this(0, "", null, null)
-    }
-
-    def this(i0: Int, s0: String, i1: Int, i2: Int, l0: Long, s1: String, l1: Long) {
-      this(i0, s0, (i1, new CustomType(i2, l0, s1)), new NestedPojo(l1))
-    }
-
-    override def toString: String = {
-      number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber
-    }
-
-    @transient var ignoreMe: Long = 1L
-  }
-
-  class NestedPojo(var longNumber: Long) {
-    def this() {
-      this(0)
-    }
-  }
-
-  class CrazyNested(var nest_Lvl1: CrazyNestedL1, var something: Long) {
-    def this() {
-      this(new CrazyNestedL1, 0)
-    }
-
-    def this(set: String) {
-      this()
-      nest_Lvl1 = new CrazyNestedL1
-      nest_Lvl1.nest_Lvl2 = new CrazyNestedL2
-      nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3
-      nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4
-      nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = set
-    }
-
-    def this(set: String, second: String, s: Long) {
-      this(set)
-      something = s
-      nest_Lvl1.a = second
-    }
-  }
-
-  class CrazyNestedL1 {
-    var a: String = null
-    var b: Int = 0
-    var nest_Lvl2: CrazyNestedL2 = null
-  }
-
-  class CrazyNestedL2 {
-    var nest_Lvl3: CrazyNestedL3 = null
-  }
-
-  class CrazyNestedL3 {
-    var nest_Lvl4: CrazyNestedL4 = null
-  }
-
-  class CrazyNestedL4 {
-    var f1nal: String = null
-  }
-
-  class PojoContainingTupleAndWritable(
-      var someInt: Int,
-      var someString: String,
-      var hadoopFan: IntWritable,
-      var theTuple: (Long, Long)) {
-    def this() {
-      this(0, "", new IntWritable(0), (0, 0))
-    }
-
-    def this(i: Int, l1: Long, l2: Long) {
-      this()
-      hadoopFan = new IntWritable(i)
-      someInt = i
-      theTuple = (l1, l2)
-    }
-
-  }
-
-  class Pojo1 {
-    var a: String = null
-    var b: String = null
-
-    override def toString = s"Pojo1 a=$a b=$b"
-  }
-
-  class Pojo2 {
-    var a2: String = null
-    var b2: String = null
-
-    override def toString = s"Pojo2 a2=$a2 b2=$b2"
-  }
-
-  class PojoWithMultiplePojos {
-
-    def this(a: String, b: String, a1: String, b1: String, i0: Int) {
-      this()
-      p1 = new Pojo1
-      p1.a = a
-      p1.b = b
-      p2 = new Pojo2
-      p2.a2 = a1
-      p2.b2 = b1
-      this.i0 = i0
-    }
-
-    var p1: Pojo1 = null
-    var p2: Pojo2 = null
-    var i0: Int = 0
-
-    override def toString = s"PojoWithMultiplePojos p1=$p1 p2=$p2 i0=$i0"
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 16dccb6..ff2b607 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -36,54 +36,63 @@ under the License.
 	<packaging>jar</packaging>
 
 	<dependencies>
+	
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
+			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-compiler</artifactId>
 			<version>${project.version}</version>
+			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-runtime</artifactId>
 			<version>${project.version}</version>
+			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-clients</artifactId>
 			<version>${project.version}</version>
+			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
+			<scope>test</scope>
 		</dependency>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>4.11</version>
-		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-scala</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java-examples</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-scala-examples</artifactId>
@@ -91,17 +100,162 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 		
-		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
-			<scope>provided</scope>
+			<scope>test</scope>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.scalatest</groupId>
+			<artifactId>scalatest_2.10</artifactId>
+			<version>2.2.0</version>
+			<scope>test</scope>
 		</dependency>
 	</dependencies>
 
 	<build>
 		<plugins>
+		
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+ 
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+			
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+		
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>
@@ -113,6 +267,7 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+			
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-surefire-plugin</artifactId>
@@ -125,6 +280,7 @@ under the License.
 					</excludes>
 				</configuration>
 			</plugin>
+			
 			<plugin>
 				<artifactId>maven-failsafe-plugin</artifactId>
 				<configuration>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0ad9031/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
new file mode 100644
index 0000000..729fda4
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.junit.Test
+import org.apache.flink.api.common.InvalidProgramException
+
+// Verify that the sanity checking in delta iterations works. We just
+// have a dummy job that is not meant to be executed. Only verify that
+// the join/coGroup inside the iteration is checked.
+class DeltaIterationSanityCheckTest extends Serializable {
+
+  @Test
+  def testCorrectJoinWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.join(ws).where("_1").equalTo("_1") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.print()
+  }
+
+  @Test
+  def testCorrectJoinWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.print()
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.join(ws).where("_2").equalTo("_2") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.print()
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.join(s).where("_2").equalTo("_2") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.print()  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
+      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.print()
+   }
+
+  @Test
+  def testCorrectCoGroupWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.coGroup(ws).where("_1").equalTo("_1") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.print()
+  }
+
+  @Test
+  def testCorrectCoGroupWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.print()
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.coGroup(ws).where("_2").equalTo("_2") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.print()
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.coGroup(s).where("_2").equalTo("_2") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.print()  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
+      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.print()
+  }
+}


Mime
View raw message