flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/4] git commit: [scala] Add missing operator IT Cases
Date Thu, 09 Oct 2014 16:43:36 GMT
[scala] Add missing operator IT Cases


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d749c24d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d749c24d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d749c24d

Branch: refs/heads/master
Commit: d749c24d9b6a1790f9572b72075e33b8ef6a83a8
Parents: f562d49
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Oct 9 15:51:24 2014 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Oct 9 18:35:22 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/operators/Keys.java   |   3 +-
 .../org/apache/flink/api/scala/DataSet.scala    |  11 +
 .../api/scala/codegen/TypeInformationGen.scala  |  42 +-
 .../api/scala/unfinishedKeyPairOperation.scala  |   2 +-
 .../api/scala/operators/AggregateITCase.scala   |   4 +-
 .../api/scala/operators/CoGroupITCase.scala     | 402 +++++++++
 .../flink/api/scala/operators/CrossITCase.scala | 239 ++++++
 .../api/scala/operators/FilterITCase.scala      | 173 ++++
 .../api/scala/operators/FirstNITCase.scala      | 116 +++
 .../api/scala/operators/FlatMapITCase.scala     | 219 +++++
 .../api/scala/operators/GroupReduceITCase.scala | 830 +++++++++++--------
 .../flink/api/scala/operators/JoinITCase.scala  | 376 +++++++++
 .../flink/api/scala/operators/MapITCase.scala   | 242 ++++++
 .../api/scala/operators/PartitionITCase.scala   |   2 +-
 .../api/scala/operators/ReduceITCase.scala      | 236 ++++++
 .../api/scala/operators/SumMinMaxITCase.scala   | 146 ++++
 .../flink/api/scala/operators/UnionITCase.scala | 126 +++
 .../api/scala/util/CollectionDataSets.scala     |  31 +-
 .../javaApiOperators/GroupReduceITCase.java     |   7 +-
 19 files changed, 2859 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 40ce238..74a70d5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,7 +115,7 @@ public abstract class Keys<T> {
 				
 				if(keyType.isTupleType()) {
 					// special case again:
-					TupleTypeInfo<?> tupleKeyType = (TupleTypeInfo<?>) keyType;
+					TupleTypeInfoBase<?> tupleKeyType = (TupleTypeInfoBase<?>) keyType;
 					List<FlatFieldDescriptor> keyTypeFields = new ArrayList<FlatFieldDescriptor>(tupleKeyType.getTotalFields());
 					tupleKeyType.getKey(ExpressionKeys.SELECT_ALL_CHAR, 0, keyTypeFields);
 					if(expressionKeys.keyFields.size() != keyTypeFields.size()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 7a2c699..0f7f723 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
 import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, ScalaAggregateOperator}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -221,6 +222,16 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     this
   }
 
+  def withParameters(parameters: Configuration): DataSet[T] = {
+    javaSet match {
+      case udfOp: UdfOperator[_] => udfOp.withParameters(parameters)
+      case _ =>
+        throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " +
+          "parameters")
+    }
+    this
+  }
+
   // --------------------------------------------------------------------------------------------
   //  Filter & Transformations
   // --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index 0686668..187ec7d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.api.scala.codegen
 
+import java.lang.reflect.{Field, Modifier}
+
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -29,6 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.hadoop.io.Writable
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import scala.reflect.macros.Context
 
@@ -154,17 +157,40 @@ private[flink] trait TypeInformationGen[C <: Context] {
       val fields =  fieldsList.splice
       val clazz: Class[T] = tpeClazz.splice
 
-      val fieldMap = TypeExtractor.getAllDeclaredFields(clazz).asScala map {
-        f => (f.getName, f)
-      } toMap
-
-      val pojoFields = fields map {
-        case (fName, fTpe) =>
-          new PojoField(fieldMap(fName), fTpe)
+      var traversalClazz: Class[_] = clazz
+      val clazzFields = mutable.Map[String, Field]()
+
+      var error = false
+      while (traversalClazz != null) {
+        for (field <- traversalClazz.getDeclaredFields) {
+          if (clazzFields.contains(field.getName)) {
+            println(s"The field $field is already contained in the " +
+              s"hierarchy of the class ${clazz}. Please use unique field names throughout " +
+              "your class hierarchy")
+            error = true
+          }
+          clazzFields += (field.getName -> field)
+        }
+        traversalClazz = traversalClazz.getSuperclass
       }
 
-      new PojoTypeInfo(clazz, pojoFields.asJava)
+      if (error) {
+        new GenericTypeInfo(clazz)
+      } else {
+        val pojoFields = fields flatMap {
+          case (fName, fTpe) =>
+            val field = clazzFields(fName)
+            if (Modifier.isTransient(field.getModifiers) || Modifier.isStatic(field.getModifiers)) {
+              // ignore transient and static fields
+              // the TypeAnalyzer for some reason does not always detect transient fields
+              None
+            } else {
+              Some(new PojoField(clazzFields(fName), fTpe))
+            }
+        }
 
+        new PojoTypeInfo(clazz, pojoFields.asJava)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
index b2929b9..e0d863d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
@@ -131,7 +131,7 @@ private[flink] class HalfUnfinishedKeyPairOperation[L, R, O](
    * Specify the key selector function for the right side of the key based operation. This returns
    * the finished operation.
    */
-  def equalTo[K: TypeInformation](fun: (R) => K) = {
+  def equalTo[K: TypeInformation](fun: (R) => K): O = {
     val keyType = implicitly[TypeInformation[K]]
     val keyExtractor = new KeySelector[R, K] {
       def getKey(in: R) = fun(in)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 0e3f2ed..ad22fa6 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -117,7 +117,7 @@ class AggregateITCase(config: Configuration) extends JavaProgramTestBase(config)
   }
 
   protected def testProgram(): Unit = {
-    expectedResult = DistinctProgs.runProgram(curProgId, resultPath)
+    expectedResult = AggregateProgs.runProgram(curProgId, resultPath)
   }
 
   protected override def postSubmit(): Unit = {
@@ -129,7 +129,7 @@ object AggregateITCase {
   @Parameters
   def getConfigurations: java.util.Collection[Array[AnyRef]] = {
     val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to DistinctProgs.NUM_PROGRAMS) {
+    for (i <- 1 to AggregateProgs.NUM_PROGRAMS) {
       val config = new Configuration()
       config.setInteger("ProgramId", i)
       configs += Array(config)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
new file mode 100644
index 0000000..d4ce3b7
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.apache.flink.api.common.functions.RichCoGroupFunction
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.util.Collector
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import org.junit.Assert
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.flink.api.scala._
+
+
+object CoGroupProgs {
+  var NUM_PROGRAMS: Int = 13
+
+  def runProgram(progId: Int, resultPath: String): String = {
+    progId match {
+      case 1 =>
+        /*
+         * CoGroup on tuples with key field selector
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get5TupleDataSet(env)
+        val ds2 = CollectionDataSets.get5TupleDataSet(env)
+        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
+          (first, second) =>
+            var sum = 0
+            var id = 0
+            for (t <- first) {
+              sum += t._3
+              id = t._1
+            }
+            for (t <- second) {
+              sum += t._3
+              id = t._1
+            }
+            (id, sum)
+        }
+        coGroupDs.writeAsCsv(resultPath)
+        env.execute()
+        "1,0\n" + "2,6\n" + "3,24\n" + "4,60\n" + "5,120\n"
+
+      case 2 =>
+        /*
+         * CoGroup on two custom type inputs with key extractors
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getCustomTypeDataSet(env)
+        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+
+        val coGroupDs = ds.coGroup(ds2).where(_.myInt).equalTo(_.myInt) apply {
+          (first, second) =>
+            val o = new CustomType(0, 0, "test")
+            for (c <- first) {
+              o.myInt = c.myInt
+              o.myLong += c.myLong
+            }
+            for (c <- second) {
+              o.myInt = c.myInt
+              o.myLong += c.myLong
+            }
+            o
+        }
+        coGroupDs.writeAsText(resultPath)
+        env.execute()
+        "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" + "6," +
+          "210,test\n"
+
+      case 3 =>
+        /*
+         * check correctness of cogroup if UDF returns left input objects
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val ds2 = CollectionDataSets.get3TupleDataSet(env)
+        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
+          (first, second, out: Collector[(Int, Long, String)] ) =>
+            for (t <- first) {
+              if (t._1 < 6) {
+                out.collect(t)
+              }
+            }
+        }
+        coGroupDs.writeAsCsv(resultPath)
+        env.execute()
+        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+          "how are you?\n" + "5,3,I am fine.\n"
+
+      case 4 =>
+        /*
+         * check correctness of cogroup if UDF returns right input objects
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get5TupleDataSet(env)
+        val ds2 = CollectionDataSets.get5TupleDataSet(env)
+        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
+          (first, second, out: Collector[(Int, Long, Int, String, Long)]) =>
+            for (t <- second) {
+              if (t._1 < 4) {
+                out.collect(t)
+              }
+            }
+        }
+        coGroupDs.writeAsCsv(resultPath)
+        env.execute()
+        "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt,2\n" + "2,3,2,Hallo Welt wie,1\n" + "3,4,3," +
+          "Hallo Welt wie gehts?,2\n" + "3,5,4,ABC,2\n" + "3,6,5,BCD,3\n"
+
+      case 5 =>
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val intDs = CollectionDataSets.getIntDataSet(env)
+        val ds = CollectionDataSets.get5TupleDataSet(env)
+        val ds2 = CollectionDataSets.get5TupleDataSet(env)
+        val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).apply(
+          new RichCoGroupFunction[
+            (Int, Long, Int, String, Long),
+            (Int, Long, Int, String, Long),
+            (Int, Int, Int)] {
+            private var broadcast = 41
+
+            override def open(config: Configuration) {
+              val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+              broadcast = ints.sum
+            }
+
+            override def coGroup(
+                first: java.lang.Iterable[(Int, Long, Int, String, Long)],
+                second: java.lang.Iterable[(Int, Long, Int, String, Long)],
+                out: Collector[(Int, Int, Int)]): Unit = {
+              var sum = 0
+              var id = 0
+              for (t <- first.asScala) {
+                sum += t._3
+                id = t._1
+              }
+              for (t <- second.asScala) {
+                sum += t._3
+                id = t._1
+              }
+              out.collect((id, sum, broadcast))
+            }
+
+        }).withBroadcastSet(intDs, "ints")
+        coGroupDs.writeAsCsv(resultPath)
+        env.execute()
+        "1,0,55\n" + "2,6,55\n" + "3,24,55\n" + "4,60,55\n" + "5,120,55\n"
+
+      case 6 =>
+        /*
+         * CoGroup on a tuple input with key field selector and a custom type input with
+         * key extractor
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get5TupleDataSet(env)
+        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+        val coGroupDs = ds.coGroup(ds2).where(2).equalTo(_.myInt) apply {
+          (first, second) =>
+            var sum = 0L
+            var id = 0
+            for (t <- first) {
+              sum += t._1
+              id = t._3
+            }
+            for (t <- second) {
+              sum += t.myLong
+              id = t.myInt
+            }
+            (id, sum, "test")
+        }
+        coGroupDs.writeAsCsv(resultPath)
+        env.execute()
+        "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" + "5," +
+          "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
+          "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
+
+      case 7 =>
+        /*
+         * CoGroup on a tuple input with key field selector and a custom type input with
+         * key extractor
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get5TupleDataSet(env)
+        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+        val coGroupDs = ds2.coGroup(ds).where(_.myInt).equalTo(2) {
+          (first, second) =>
+            var sum = 0L
+            var id = 0
+            for (t <- first) {
+              sum += t.myLong
+              id = t.myInt
+            }
+            for (t <- second) {
+              sum += t._1
+              id = t._3
+            }
+
+            new CustomType(id, sum, "test")
+        }
+        coGroupDs.writeAsText(resultPath)
+        env.execute()
+        "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" + "5," +
+          "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
+          "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
+
+      case 8 =>
+        /*
+         * CoGroup with multiple key fields
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds1 = CollectionDataSets.get5TupleDataSet(env)
+        val ds2 = CollectionDataSets.get3TupleDataSet(env)
+        val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) {
+          (first, second, out: Collector[(Int, Long, String)]) =>
+            val strs = first map(_._4)
+            for (t <- second) {
+              for (s <- strs) {
+                out.collect((t._1, t._2, s))
+              }
+            }
+        }
+
+        coGrouped.writeAsCsv(resultPath)
+        env.execute()
+        "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
+          "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
+
+      case 9 =>
+        /*
+         * CoGroup with multiple key fields
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds1 = CollectionDataSets
+          .get5TupleDataSet(env)
+        val ds2 = CollectionDataSets.get3TupleDataSet(env)
+        val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2))
+          .apply {
+          (first, second, out: Collector[(Int, Long, String)]) =>
+            val strs = first map(_._4)
+            for (t <- second) {
+              for (s <- strs) {
+                out.collect((t._1, t._2, s))
+              }
+            }
+        }
+
+        coGrouped.writeAsCsv(resultPath)
+        env.execute()
+        "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
+          "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
+
+      case 10 =>
+        /*
+         * CoGroup on two custom type inputs using expression keys
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getCustomTypeDataSet(env)
+        val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+        val coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt") {
+          (first, second) =>
+            val o = new CustomType(0, 0, "test")
+            for (t <- first) {
+              o.myInt = t.myInt
+              o.myLong += t.myLong
+            }
+            for (t <- second) {
+              o.myInt = t.myInt
+              o.myLong += t.myLong
+            }
+            o
+        }
+        coGroupDs.writeAsText(resultPath)
+        env.execute()
+        "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" + "6," +
+          "210,test\n"
+
+      case 11 =>
+        /*
+         * CoGroup on two custom type inputs using expression keys
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmallPojoDataSet(env)
+        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+        val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) {
+          (first, second, out: Collector[CustomType]) =>
+            for (p <- first) {
+              for (t <- second) {
+                Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+                out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+              }
+            }
+        }
+        coGroupDs.writeAsText(resultPath)
+        env.execute()
+        "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
+
+      case 12 =>
+        /*
+         * CoGroup field-selector (expression keys) + key selector function
+         * The key selector is unnecessary complicated (Tuple1) ;)
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmallPojoDataSet(env)
+        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+        val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) {
+          (first, second, out: Collector[CustomType]) =>
+            for (p <- first) {
+              for (t <- second) {
+                Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+                out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+              }
+            }
+        }
+        coGroupDs.writeAsText(resultPath)
+        env.execute()
+        "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
+
+      case 13 =>
+        /*
+         * CoGroup field-selector (expression keys) + key selector function
+         * The key selector is simple here
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmallPojoDataSet(env)
+        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+        val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) {
+          (first, second, out: Collector[CustomType]) =>
+            for (p <- first) {
+              for (t <- second) {
+                Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+                out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+              }
+            }
+        }
+        coGroupDs.writeAsText(resultPath)
+        env.execute()
+        "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
+
+      case _ =>
+        throw new IllegalArgumentException("Invalid program id")
+    }
+  }
+}
+
+
+@RunWith(classOf[Parameterized])
+class CoGroupITCase(config: Configuration) extends JavaProgramTestBase(config) {
+
+  private val curProgId: Int = config.getInteger("ProgramId", -1)
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  protected override def preSubmit(): Unit = {
+    resultPath = getTempDirPath("result")
+  }
+
+  protected def testProgram(): Unit = {
+    expectedResult = CoGroupProgs.runProgram(curProgId, resultPath)
+  }
+
+  protected override def postSubmit(): Unit = {
+    compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+}
+
+object CoGroupITCase {
+  @Parameters
+  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
+    val configs = mutable.MutableList[Array[AnyRef]]()
+    for (i <- 1 to CoGroupProgs.NUM_PROGRAMS) {
+      val config = new Configuration()
+      config.setInteger("ProgramId", i)
+      configs += Array(config)
+    }
+
+    configs.asJavaCollection
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
new file mode 100644
index 0000000..bcb85e5
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.apache.flink.api.common.functions.RichCrossFunction
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.flink.api.scala._
+
+
+object CrossProgs {
+  var NUM_PROGRAMS: Int = 9
+
+  def runProgram(progId: Int, resultPath: String): String = {
+    progId match {
+      case 1 =>
+        /*
+         * check correctness of cross on two tuple inputs
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+        val crossDs = ds.cross(ds2) { (l, r) => (l._3 + r._3, l._4 + r._4) }
+        crossDs.writeAsCsv(resultPath)
+        env.execute()
+
+        "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1," +
+          "Hallo WeltHallo\n" + "2,Hallo WeltHallo Welt\n" + "3,Hallo WeltHallo Welt wie\n" + "2," +
+          "Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4," +
+          "Hallo Welt wieHallo Welt wie\n"
+
+      case 2 =>
+        /*
+         * check correctness of cross if UDF returns left input object
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
+        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+        val crossDs = ds.cross(ds2) { (l, r ) => l }
+        crossDs.writeAsCsv(resultPath)
+        env.execute()
+
+        "1,1,Hi\n" + "1,1,Hi\n" + "1,1,Hi\n" + "2,2,Hello\n" + "2,2,Hello\n" + "2,2," +
+          "Hello\n" + "3,2,Hello world\n" + "3,2,Hello world\n" + "3,2,Hello world\n"
+
+      case 3 =>
+        /*
+         * check correctness of cross if UDF returns right input object
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
+        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+        val crossDs = ds.cross(ds2) { (l, r) => r }
+        crossDs.writeAsCsv(resultPath)
+        env.execute()
+
+        "1,1,0,Hallo,1\n" + "1,1,0,Hallo,1\n" + "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt," +
+          "2\n" + "2,2,1,Hallo Welt,2\n" + "2,2,1,Hallo Welt,2\n" + "2,3,2,Hallo Welt wie," +
+          "1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,3,2,Hallo Welt wie,1\n"
+
+      case 4 =>
+        /*
+         * check correctness of cross with broadcast set
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val intDs = CollectionDataSets.getIntDataSet(env)
+        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+        val crossDs = ds.cross(ds2).apply (
+          new RichCrossFunction[
+          (Int, Long, Int, String, Long),
+          (Int, Long, Int, String, Long),
+          (Int, Int, Int)] {
+          private var broadcast = 41
+
+          override def open(config: Configuration) {
+            val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+            broadcast = ints.sum
+          }
+
+          override def cross(
+              first: (Int, Long, Int, String, Long),
+              second: (Int, Long, Int, String, Long)): (Int, Int, Int) = {
+            (first._1 + second._1, first._3.toInt * second._3.toInt, broadcast)
+          }
+
+        })withBroadcastSet(intDs, "ints")
+        crossDs.writeAsCsv(resultPath)
+        env.execute()
+        "2,0,55\n" + "3,0,55\n" + "3,0,55\n" + "3,0,55\n" + "4,1,55\n" + "4,2,55\n" + "3," +
+          "0,55\n" + "4,2,55\n" + "4,4,55\n"
+
+      case 5 =>
+        /*
+         * check correctness of crossWithHuge (only correctness of result -> should be the same
+         * as with normal cross)
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+        val crossDs = ds.crossWithHuge(ds2) { (l, r) => (l._3 + r._3, l._4 + r._4)}
+        crossDs.writeAsCsv(resultPath)
+        env.execute()
+        "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1," +
+          "Hallo WeltHallo\n" + "2,Hallo WeltHallo Welt\n" + "3,Hallo WeltHallo Welt wie\n" + "2," +
+          "Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4," +
+          "Hallo Welt wieHallo Welt wie\n"
+
+      case 6 =>
+        /*
+         * check correctness of crossWithTiny (only correctness of result -> should be the same
+         * as with normal cross)
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets
+          .getSmall5TupleDataSet(env)
+        val ds2 = CollectionDataSets
+          .getSmall5TupleDataSet(env)
+        val crossDs = ds.crossWithTiny(ds2) { (l, r) => (l._3 + r._3, l._4 + r._4)}
+        crossDs.writeAsCsv(resultPath)
+        env.execute()
+        "0,HalloHallo\n" + "1,HalloHallo Welt\n" + "2,HalloHallo Welt wie\n" + "1," +
+          "Hallo WeltHallo\n" + "2,Hallo WeltHallo Welt\n" + "3,Hallo WeltHallo Welt wie\n" + "2," +
+          "Hallo Welt wieHallo\n" + "3,Hallo Welt wieHallo Welt\n" + "4," +
+          "Hallo Welt wieHallo Welt wie\n"
+
+      case 7 => // 9 in Java CrossITCase
+        /*
+         * check correctness of default cross
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
+        val ds2 = CollectionDataSets.getSmall5TupleDataSet(env)
+        val crossDs = ds.cross(ds2)
+        crossDs.writeAsCsv(resultPath)
+        env.execute()
+        "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" + "(1,1,Hi),(1,1,0,Hallo,1)\n" + "(1,1,Hi),(2,3," +
+          "2,Hallo Welt wie,1)\n" + "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" + "(2,2,Hello),(1,1,0," +
+          "Hallo,1)\n" + "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" + "(3,2,Hello world),(2,2,1," +
+          "Hallo Welt,2)\n" + "(3,2,Hello world),(1,1,0,Hallo,1)\n" + "(3,2,Hello world),(2,3,2," +
+          "Hallo Welt wie,1)\n"
+
+      case 8 => // 10 in Java CrossITCase
+        /*
+         * check correctness of cross on two custom type inputs
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmallCustomTypeDataSet(env)
+        val ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env)
+        val crossDs = ds.cross(ds2) {
+          (l, r) => new CustomType(l.myInt * r.myInt, l.myLong + r.myLong, l.myString + r.myString)
+        }
+        crossDs.writeAsText(resultPath)
+        env.execute()
+        "1,0,HiHi\n" + "2,1,HiHello\n" + "2,2,HiHello world\n" + "2,1,HelloHi\n" + "4,2," +
+          "HelloHello\n" + "4,3,HelloHello world\n" + "2,2,Hello worldHi\n" + "4,3," +
+          "Hello worldHello\n" + "4,4,Hello worldHello world"
+
+      case 9 => // 11 in Java CrossITCase
+        /*
+         * check correctness of cross a tuple input and a custom type input
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getSmall5TupleDataSet(env)
+        val ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env)
+        val crossDs = ds.cross(ds2) {
+          (l, r) => (l._1 + r.myInt, l._3 * r.myLong, l._4 + r.myString)
+        }
+        crossDs.writeAsCsv(resultPath)
+        env.execute()
+        "2,0,HalloHi\n" + "3,0,HalloHello\n" + "3,0,HalloHello world\n" + "3,0," +
+          "Hallo WeltHi\n" + "4,1,Hallo WeltHello\n" + "4,2,Hallo WeltHello world\n" + "3,0," +
+          "Hallo Welt wieHi\n" + "4,2,Hallo Welt wieHello\n" + "4,4,Hallo Welt wieHello world\n"
+
+      case _ =>
+        throw new IllegalArgumentException("Invalid program id")
+    }
+  }
+}
+
+
+@RunWith(classOf[Parameterized])
+class CrossITCase(config: Configuration) extends JavaProgramTestBase(config) {
+
+  private var curProgId: Int = config.getInteger("ProgramId", -1)
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  protected override def preSubmit(): Unit = {
+    resultPath = getTempDirPath("result")
+  }
+
+  protected def testProgram(): Unit = {
+    expectedResult = CrossProgs.runProgram(curProgId, resultPath)
+  }
+
+  protected override def postSubmit(): Unit = {
+    compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+}
+
+object CrossITCase {
+  @Parameters
+  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
+    val configs = mutable.MutableList[Array[AnyRef]]()
+    for (i <- 1 to CrossProgs.NUM_PROGRAMS) {
+      val config = new Configuration()
+      config.setInteger("ProgramId", i)
+      configs += Array(config)
+    }
+
+    configs.asJavaCollection
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
new file mode 100644
index 0000000..973028b
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.flink.api.scala._
+
+
+object FilterProgs {
+  var NUM_PROGRAMS: Int = 7
+
+  def runProgram(progId: Int, resultPath: String): String = {
+    progId match {
+      case 1 =>
+        /*
+         * Test all-rejecting filter.
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val filterDs = ds.filter( t => false )
+        filterDs.writeAsCsv(resultPath)
+        env.execute()
+        "\n"
+
+      case 2 =>
+        /*
+         * Test all-passing filter.
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val filterDs = ds.filter( t => true )
+        filterDs.writeAsCsv(resultPath)
+        env.execute()
+        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+          "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+          "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+          "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+          "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+          "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+      case 3 =>
+        /*
+         * Test filter on String tuple field.
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val filterDs = ds.filter( _._3.contains("world") )
+        filterDs.writeAsCsv(resultPath)
+        env.execute()
+        "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+
+      case 4 =>
+        /*
+         * Test filter on Integer tuple field.
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val filterDs = ds.filter( _._1 % 2 == 0 )
+        filterDs.writeAsCsv(resultPath)
+        env.execute()
+        "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+          "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+          "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+
+      case 5 =>
+        /*
+         * Test filter on basic type
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getStringDataSet(env)
+        val filterDs = ds.filter( _.startsWith("H") )
+        filterDs.writeAsText(resultPath)
+        env.execute()
+        "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+
+      case 6 =>
+        /*
+         * Test filter on custom type
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getCustomTypeDataSet(env)
+        val filterDs = ds.filter( _.myString.contains("a") )
+        filterDs.writeAsText(resultPath)
+        env.execute()
+        "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+
+      case 7 =>
+        /*
+         * Test filter on String tuple field.
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ints = CollectionDataSets.getIntDataSet(env)
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val filterDs = ds.filter( new RichFilterFunction[(Int, Long, String)] {
+          var literal = -1
+          override def open(config: Configuration): Unit = {
+            val ints = getRuntimeContext.getBroadcastVariable[Int]("ints")
+            for (i <- ints.asScala) {
+              literal = if (literal < i) i else literal
+            }
+          }
+          override def filter(value: (Int, Long, String)): Boolean = value._1 < literal
+        }).withBroadcastSet(ints, "ints")
+        filterDs.writeAsCsv(resultPath)
+        env.execute()
+        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+
+      case _ =>
+        throw new IllegalArgumentException("Invalid program id")
+    }
+  }
+}
+
+
+@RunWith(classOf[Parameterized])
+class FilterITCase(config: Configuration) extends JavaProgramTestBase(config) {
+
+  private var curProgId: Int = config.getInteger("ProgramId", -1)
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  protected override def preSubmit(): Unit = {
+    resultPath = getTempDirPath("result")
+  }
+
+  protected def testProgram(): Unit = {
+    expectedResult = FilterProgs.runProgram(curProgId, resultPath)
+  }
+
+  protected override def postSubmit(): Unit = {
+    compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+}
+
+object FilterITCase {
+  @Parameters
+  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
+    val configs = mutable.MutableList[Array[AnyRef]]()
+    for (i <- 1 to FilterProgs.NUM_PROGRAMS) {
+      val config = new Configuration()
+      config.setInteger("ProgramId", i)
+      configs += Array(config)
+    }
+
+    configs.asJavaCollection
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
new file mode 100644
index 0000000..6882885
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.flink.api.scala._
+
+
+object FirstNProgs {
+  var NUM_PROGRAMS: Int = 3
+
+  def runProgram(progId: Int, resultPath: String): String = {
+    progId match {
+      case 1 =>
+        /*
+         * First-n on ungrouped data set
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val seven = ds.first(7).map( t => new Tuple1(1) ).sum(0)
+        seven.writeAsText(resultPath)
+        env.execute()
+        "(7)\n"
+
+      case 2 =>
+        /*
+         * First-n on grouped data set
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val first = ds.groupBy(1).first(4).map( t => (t._2, 1)).groupBy(0).sum(1)
+        first.writeAsText(resultPath)
+        env.execute()
+        "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n"
+
+      case 3 =>
+        /*
+         * First-n on grouped and sorted data set
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val first = ds.groupBy(1)
+          .sortGroup(0, Order.DESCENDING)
+          .first(3)
+          .map ( t => (t._2, t._1))
+        first.writeAsText(resultPath)
+        env.execute()
+        "(1,1)\n" + "(2,3)\n(2,2)\n" + "(3,6)\n(3,5)\n(3,4)\n" + "(4,10)\n(4,9)\n(4," +
+          "8)\n" + "(5,15)\n(5,14)\n(5,13)\n" + "(6,21)\n(6,20)\n(6,19)\n"
+
+      case _ =>
+        throw new IllegalArgumentException("Invalid program id")
+    }
+  }
+}
+
+
+@RunWith(classOf[Parameterized])
+class FirstNITCase(config: Configuration) extends JavaProgramTestBase(config) {
+
+  private var curProgId: Int = config.getInteger("ProgramId", -1)
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  protected override def preSubmit(): Unit = {
+    resultPath = getTempDirPath("result")
+  }
+
+  protected def testProgram(): Unit = {
+    expectedResult = FirstNProgs.runProgram(curProgId, resultPath)
+  }
+
+  protected override def postSubmit(): Unit = {
+    compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+}
+
+object FirstNITCase {
+  @Parameters
+  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
+    val configs = mutable.MutableList[Array[AnyRef]]()
+    for (i <- 1 to FirstNProgs.NUM_PROGRAMS) {
+      val config = new Configuration()
+      config.setInteger("ProgramId", i)
+      configs += Array(config)
+    }
+
+    configs.asJavaCollection
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d749c24d/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
new file mode 100644
index 0000000..0d80d22
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.util.Collector
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.flink.api.scala._
+
+
+object FlatMapProgs {
+  var NUM_PROGRAMS: Int = 7
+
+  def runProgram(progId: Int, resultPath: String): String = {
+    progId match {
+      case 1 =>
+        /*
+         * Test non-passing flatmap
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getStringDataSet(env)
+        val nonPassingFlatMapDs = ds.flatMap( in => if (in.contains("banana")) Some(in) else None )
+        nonPassingFlatMapDs.writeAsText(resultPath)
+        env.execute()
+        "\n"
+        
+      case 2 =>
+        /*
+         * Test data duplicating flatmap
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getStringDataSet(env)
+        val duplicatingFlatMapDs = ds.flatMap( in => Seq(in, in.toUpperCase) )
+        duplicatingFlatMapDs.writeAsText(resultPath)
+        env.execute()
+        "Hi\n" + "HI\n" + "Hello\n" + "HELLO\n" + "Hello world\n" + "HELLO WORLD\n" +
+          "Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" + "I am fine.\n" + "I AM " +
+          "FINE.\n" + "Luke Skywalker\n" + "LUKE SKYWALKER\n" + "Random comment\n" + "RANDOM " +
+          "COMMENT\n" + "LOL\n" + "LOL\n"
+
+      case 3 =>
+        /*
+         * Test flatmap with varying number of emitted tuples
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val varyingTuplesMapDs = ds.flatMap {
+          in =>
+            val numTuples = in._1 % 3
+            (0 until numTuples) map { i => in }
+        }
+        varyingTuplesMapDs.writeAsCsv(resultPath)
+        env.execute()
+        "1,1,Hi\n" + "2,2,Hello\n" + "2,2,Hello\n" + "4,3,Hello world, " +
+          "how are you?\n" + "5,3,I am fine.\n" + "5,3,I am fine.\n" + "7,4,Comment#1\n" + "8,4," +
+          "Comment#2\n" + "8,4,Comment#2\n" + "10,4,Comment#4\n" + "11,5,Comment#5\n" + "11,5," +
+          "Comment#5\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "14,5,Comment#8\n" + "16,6," +
+          "Comment#10\n" + "17,6,Comment#11\n" + "17,6,Comment#11\n" + "19,6,Comment#13\n" + "20," +
+          "6,Comment#14\n" + "20,6,Comment#14\n"
+
+      case 4 =>
+        /*
+         * Test type conversion flatmapper (Custom -> Tuple)
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.getCustomTypeDataSet(env)
+        val typeConversionFlatMapDs = ds.flatMap { in => Some((in.myInt, in.myLong, in.myString)) }
+        typeConversionFlatMapDs.writeAsCsv(resultPath)
+        env.execute()
+        "1,0,Hi\n" + "2,1,Hello\n" + "2,2,Hello world\n" + "3,3,Hello world, " +
+          "how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + "4,6," +
+          "Comment#1\n" + "4,7,Comment#2\n" + "4,8,Comment#3\n" + "4,9,Comment#4\n" + "5,10," +
+          "Comment#5\n" + "5,11,Comment#6\n" + "5,12,Comment#7\n" + "5,13,Comment#8\n" + "5,14," +
+          "Comment#9\n" + "6,15,Comment#10\n" + "6,16,Comment#11\n" + "6,17,Comment#12\n" + "6," +
+          "18,Comment#13\n" + "6,19,Comment#14\n" + "6,20,Comment#15\n"
+
+      case 5 =>
+        /*
+         * Test type conversion flatmapper (Tuple -> Basic)
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val typeConversionFlatMapDs = ds.flatMap ( in => Some(in._3) )
+        typeConversionFlatMapDs.writeAsText(resultPath)
+        env.execute()
+        "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
+          ".\n" + "Luke Skywalker\n" + "Comment#1\n" + "Comment#2\n" + "Comment#3\n" +
+          "Comment#4\n" + "Comment#5\n" + "Comment#6\n" + "Comment#7\n" + "Comment#8\n" +
+          "Comment#9\n" + "Comment#10\n" + "Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
+          "Comment#14\n" + "Comment#15\n"
+
+      case 6 =>
+        /*
+         * Test flatmapper if UDF returns input object
+         * multiple times and changes it in between
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ds = CollectionDataSets.get3TupleDataSet(env).map {
+          t => MutableTuple3(t._1, t._2, t._3)
+        }
+        val inputObjFlatMapDs = ds.flatMap {
+          (in, out: Collector[MutableTuple3[Int, Long, String]]) =>
+            val numTuples = in._1 % 4
+            (0 until numTuples) foreach { i => in._1 = i; out.collect(in) }
+        }
+        inputObjFlatMapDs.writeAsCsv(resultPath)
+        env.execute()
+        "0,1,Hi\n" + "0,2,Hello\n" + "1,2,Hello\n" + "0,2,Hello world\n" + "1,2," +
+          "Hello world\n" + "2,2,Hello world\n" + "0,3,I am fine.\n" + "0,3," +
+          "Luke Skywalker\n" + "1,3,Luke Skywalker\n" + "0,4,Comment#1\n" + "1,4," +
+          "Comment#1\n" + "2,4,Comment#1\n" + "0,4,Comment#3\n" + "0,4,Comment#4\n" + "1,4," +
+          "Comment#4\n" + "0,5,Comment#5\n" + "1,5,Comment#5\n" + "2,5,Comment#5\n" + "0,5," +
+          "Comment#7\n" + "0,5,Comment#8\n" + "1,5,Comment#8\n" + "0,5,Comment#9\n" + "1,5," +
+          "Comment#9\n" + "2,5,Comment#9\n" + "0,6,Comment#11\n" + "0,6,Comment#12\n" + "1,6," +
+          "Comment#12\n" + "0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" + "0,6," +
+          "Comment#15\n"
+
+      case 7 =>
+        /*
+         * Test flatmap with broadcast set
+         */
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        val ints = CollectionDataSets.getIntDataSet(env)
+        val ds = CollectionDataSets.get3TupleDataSet(env).map {
+          t => MutableTuple3(t._1, t._2, t._3)
+        }
+        val bcFlatMapDs = ds.flatMap(
+          new RichFlatMapFunction[MutableTuple3[Int, Long, String],
+            MutableTuple3[Int, Long, String]] {
+            private var f2Replace = 0
+            private val outTuple = MutableTuple3(0, 0L, "")
+            override def open(config: Configuration): Unit = {
+              val ints = getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+              f2Replace = ints.sum
+            }
+            override def flatMap(
+                value: MutableTuple3[Int, Long, String],
+                out: Collector[MutableTuple3[Int, Long, String]]): Unit = {
+                outTuple._1 = f2Replace
+                outTuple._2 = value._2
+                outTuple._3 = value._3
+                out.collect(outTuple)
+            }
+          }).withBroadcastSet(ints, "ints")
+        bcFlatMapDs.writeAsCsv(resultPath)
+        env.execute()
+        "55,1,Hi\n" + "55,2,Hello\n" + "55,2,Hello world\n" + "55,3,Hello world, " +
+          "how are you?\n" + "55,3,I am fine.\n" + "55,3,Luke Skywalker\n" + "55,4," +
+          "Comment#1\n" + "55,4,Comment#2\n" + "55,4,Comment#3\n" + "55,4,Comment#4\n" + "55,5," +
+          "Comment#5\n" + "55,5,Comment#6\n" + "55,5,Comment#7\n" + "55,5,Comment#8\n" + "55,5," +
+          "Comment#9\n" + "55,6,Comment#10\n" + "55,6,Comment#11\n" + "55,6,Comment#12\n" + "55," +
+          "6,Comment#13\n" + "55,6,Comment#14\n" + "55,6,Comment#15\n"
+
+      case _ =>
+        throw new IllegalArgumentException("Invalid program id")
+    }
+  }
+}
+
+
+@RunWith(classOf[Parameterized])
+class FlatMapITCase(config: Configuration) extends JavaProgramTestBase(config) {
+
+  private var curProgId: Int = config.getInteger("ProgramId", -1)
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  protected override def preSubmit(): Unit = {
+    resultPath = getTempDirPath("result")
+  }
+
+  protected def testProgram(): Unit = {
+    expectedResult = FlatMapProgs.runProgram(curProgId, resultPath)
+  }
+
+  protected override def postSubmit(): Unit = {
+    compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+}
+
+object FlatMapITCase {
+  @Parameters
+  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
+    val configs = mutable.MutableList[Array[AnyRef]]()
+    for (i <- 1 to FlatMapProgs.NUM_PROGRAMS) {
+      val config = new Configuration()
+      config.setInteger("ProgramId", i)
+      configs += Array(config)
+    }
+
+    configs.asJavaCollection
+  }
+}
+


Mime
View raw message