flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [01/11] Really add POJO support and nested keys for Scala API
Date Wed, 08 Oct 2014 09:40:21 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master ec82d973d -> 6be855543


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
index dd1ac99..fe1dd43 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
@@ -17,10 +17,10 @@
  */
 package org.apache.flink.api.scala.operators
 
+import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.junit.Assert
 import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.operators.Order
-import org.junit.Ignore
 import org.junit.Test
 
 import org.apache.flink.api.scala._
@@ -96,7 +96,7 @@ class GroupingTest {
     }
   }
 
-  @Test(expected = classOf[UnsupportedOperationException])
+  @Test(expected = classOf[IllegalArgumentException])
   def testGroupByKeyFields2(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val longDs = env.fromCollection(emptyLongData)
@@ -105,7 +105,7 @@ class GroupingTest {
     longDs.groupBy("_1")
   }
 
-  @Test(expected = classOf[UnsupportedOperationException])
+  @Test(expected = classOf[IllegalArgumentException])
   def testGroupByKeyFields3(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val customDs = env.fromCollection(customTypeData)
@@ -114,7 +114,7 @@ class GroupingTest {
     customDs.groupBy("_1")
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[RuntimeException])
   def testGroupByKeyFields4(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tupleDs = env.fromCollection(emptyTupleData)
@@ -123,7 +123,15 @@ class GroupingTest {
     tupleDs.groupBy("foo")
   }
 
-  @Ignore
+  @Test
+  def testGroupByKeyFields5(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val customDs = env.fromCollection(customTypeData)
+
+    // should not work
+    customDs.groupBy("myInt")
+  }
+
   @Test
   def testGroupByKeyExpressions1(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -131,24 +139,22 @@ class GroupingTest {
 
     // should work
     try {
-//      ds.groupBy("i");
+      ds.groupBy("myInt")
     }
     catch {
       case e: Exception => Assert.fail()
     }
   }
 
-  @Ignore
-  @Test(expected = classOf[UnsupportedOperationException])
+  @Test(expected = classOf[IllegalArgumentException])
   def testGroupByKeyExpressions2(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     // should not work: groups on basic type
-//    longDs.groupBy("l");
     val longDs = env.fromCollection(emptyLongData)
+    longDs.groupBy("l")
   }
 
-  @Ignore
   @Test(expected = classOf[InvalidProgramException])
   def testGroupByKeyExpressions3(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -158,14 +164,13 @@ class GroupingTest {
     customDs.groupBy(0)
   }
 
-  @Ignore
   @Test(expected = classOf[IllegalArgumentException])
   def testGroupByKeyExpressions4(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromCollection(customTypeData)
 
     // should not work, non-existent field
-//    ds.groupBy("myNonExistent");
+    ds.groupBy("myNonExistent")
   }
 
   @Test
@@ -173,7 +178,7 @@ class GroupingTest {
     val env = ExecutionEnvironment.getExecutionEnvironment
     try {
       val customDs = env.fromCollection(customTypeData)
-      customDs.groupBy { _.l }
+      customDs.groupBy { _.myLong }
     }
     catch {
       case e: Exception => Assert.fail()

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
index cae936d..0219154 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException
+import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.junit.Assert
 import org.apache.flink.api.common.InvalidProgramException
 import org.junit.Ignore
@@ -132,7 +133,7 @@ class JoinOperatorTest {
     ds1.join(ds2).where("_1", "_2").equalTo("_3")
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[RuntimeException])
   def testJoinKeyFields4(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = env.fromCollection(emptyTupleData)
@@ -142,7 +143,7 @@ class JoinOperatorTest {
     ds1.join(ds2).where("foo").equalTo("_1")
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[RuntimeException])
   def testJoinKeyFields5(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = env.fromCollection(emptyTupleData)
@@ -152,7 +153,7 @@ class JoinOperatorTest {
     ds1.join(ds2).where("_1").equalTo("bar")
   }
 
-  @Test(expected = classOf[UnsupportedOperationException])
+  @Test(expected = classOf[IllegalArgumentException])
   def testJoinKeyFields6(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = env.fromCollection(emptyTupleData)
@@ -162,7 +163,6 @@ class JoinOperatorTest {
     ds1.join(ds2).where("_2").equalTo("_1")
   }
 
-  @Ignore
   @Test
   def testJoinKeyExpressions1(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -171,36 +171,33 @@ class JoinOperatorTest {
 
     // should work
     try {
-//      ds1.join(ds2).where("i").equalTo("i")
+      ds1.join(ds2).where("myInt").equalTo("myInt")
     }
     catch {
       case e: Exception => Assert.fail()
     }
   }
 
-  @Ignore
-  @Test(expected = classOf[InvalidProgramException])
+  @Test(expected = classOf[IncompatibleKeysException])
   def testJoinKeyExpressions2(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = env.fromCollection(customTypeData)
     val ds2 = env.fromCollection(customTypeData)
 
     // should not work, incompatible join key types
-//    ds1.join(ds2).where("i").equalTo("s")
+    ds1.join(ds2).where("myInt").equalTo("myString")
   }
 
-  @Ignore
-  @Test(expected = classOf[InvalidProgramException])
+  @Test(expected = classOf[IncompatibleKeysException])
   def testJoinKeyExpressions3(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = env.fromCollection(customTypeData)
     val ds2 = env.fromCollection(customTypeData)
 
     // should not work, incompatible number of keys
-//    ds1.join(ds2).where("i", "s").equalTo("i")
+    ds1.join(ds2).where("myInt", "myString").equalTo("myInt")
   }
 
-  @Ignore
   @Test(expected = classOf[IllegalArgumentException])
   def testJoinKeyExpressions4(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -208,7 +205,7 @@ class JoinOperatorTest {
     val ds2 = env.fromCollection(customTypeData)
 
     // should not work, join key non-existent
-//    ds1.join(ds2).where("myNonExistent").equalTo("i")
+    ds1.join(ds2).where("myNonExistent").equalTo("i")
   }
 
   @Test
@@ -219,7 +216,7 @@ class JoinOperatorTest {
 
     // should work
     try {
-      ds1.join(ds2).where { _.l} equalTo { _.l }
+      ds1.join(ds2).where { _.myLong} equalTo { _.myLong }
     }
     catch {
       case e: Exception => Assert.fail()
@@ -234,7 +231,7 @@ class JoinOperatorTest {
 
     // should work
     try {
-      ds1.join(ds2).where { _.l }.equalTo(3)
+      ds1.join(ds2).where { _.myLong }.equalTo(3)
     }
     catch {
       case e: Exception => Assert.fail()
@@ -249,7 +246,7 @@ class JoinOperatorTest {
 
     // should work
     try {
-      ds1.join(ds2).where(3).equalTo { _.l }
+      ds1.join(ds2).where(3).equalTo { _.myLong }
     }
     catch {
       case e: Exception => Assert.fail()
@@ -263,7 +260,7 @@ class JoinOperatorTest {
     val ds2 = env.fromCollection(customTypeData)
 
     // should not work, incompatible types
-    ds1.join(ds2).where(2).equalTo { _.l }
+    ds1.join(ds2).where(2).equalTo { _.myLong }
   }
 
   @Test(expected = classOf[IncompatibleKeysException])
@@ -273,7 +270,7 @@ class JoinOperatorTest {
     val ds2 = env.fromCollection(customTypeData)
 
     // should not work, more than one field position key
-    ds1.join(ds2).where(1, 3) equalTo { _.l }
+    ds1.join(ds2).where(1, 3) equalTo { _.myLong }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index a8447a9..bea91df 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -19,6 +19,7 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunction}
 import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.test.util.JavaProgramTestBase
 import org.junit.runner.RunWith
@@ -32,40 +33,18 @@ import org.apache.flink.api.scala._
 
 
 object PartitionProgs {
-  var NUM_PROGRAMS: Int = 6
-
-  val tupleInput = Array(
-    (1, "Foo"),
-    (1, "Foo"),
-    (1, "Foo"),
-    (2, "Foo"),
-    (2, "Foo"),
-    (2, "Foo"),
-    (2, "Foo"),
-    (2, "Foo"),
-    (3, "Foo"),
-    (3, "Foo"),
-    (3, "Foo"),
-    (4, "Foo"),
-    (4, "Foo"),
-    (4, "Foo"),
-    (4, "Foo"),
-    (5, "Foo"),
-    (5, "Foo"),
-    (6, "Foo"),
-    (6, "Foo"),
-    (6, "Foo"),
-    (6, "Foo")
-  )
-
+  var NUM_PROGRAMS: Int = 7
 
   def runProgram(progId: Int, resultPath: String, onCollection: Boolean): String = {
     progId match {
       case 1 =>
+        /*
+         * Test hash partition by tuple field
+         */
         val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.fromCollection(tupleInput)
+        val ds = CollectionDataSets.get3TupleDataSet(env)
 
-        val unique = ds.partitionByHash(0).mapPartition( _.map(_._1).toSet )
+        val unique = ds.partitionByHash(1).mapPartition( _.map(_._2).toSet )
 
         unique.writeAsText(resultPath)
         env.execute()
@@ -73,16 +52,22 @@ object PartitionProgs {
         "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
 
       case 2 =>
+        /*
+         * Test hash partition by key selector
+         */
         val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.fromCollection(tupleInput)
-        val unique = ds.partitionByHash( _._1 ).mapPartition( _.map(_._1).toSet )
+        val ds = CollectionDataSets.get3TupleDataSet(env)
+        val unique = ds.partitionByHash( _._2 ).mapPartition( _.map(_._2).toSet )
 
         unique.writeAsText(resultPath)
         env.execute()
         "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
 
       case 3 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
+        /*
+         * Test forced rebalancing
+         */
+      val env = ExecutionEnvironment.getExecutionEnvironment
         val ds = env.generateSequence(1, 3000)
 
         val skewed = ds.filter(_ > 780)
@@ -101,8 +86,8 @@ object PartitionProgs {
         countsInPartition.writeAsText(resultPath)
         env.execute()
 
-        val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10;
-        var result = "";
+        val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10
+        var result = ""
         for (i <- 0 until env.getDegreeOfParallelism) {
           result += "(" + i + "," + numPerPartition + ")\n"
         }
@@ -112,10 +97,12 @@ object PartitionProgs {
         // Verify that mapPartition operation after repartition picks up correct
         // DOP
         val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.fromCollection(tupleInput)
+        val ds = CollectionDataSets.get3TupleDataSet(env)
         env.setDegreeOfParallelism(1)
 
-        val unique = ds.partitionByHash(0).setParallelism(4).mapPartition( _.map(_._1).toSet
)
+        val unique = ds.partitionByHash(1)
+          .setParallelism(4)
+          .mapPartition( _.map(_._2).toSet )
 
         unique.writeAsText(resultPath)
         env.execute()
@@ -126,13 +113,13 @@ object PartitionProgs {
         // Verify that map operation after repartition picks up correct
         // DOP
         val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.fromCollection(tupleInput)
+        val ds = CollectionDataSets.get3TupleDataSet(env)
         env.setDegreeOfParallelism(1)
 
         val count = ds.partitionByHash(0).setParallelism(4).map(
-          new RichMapFunction[(Int, String), Tuple1[Int]] {
+          new RichMapFunction[(Int, Long, String), Tuple1[Int]] {
             var first = true
-            override def map(in: (Int, String)): Tuple1[Int] = {
+            override def map(in: (Int, Long, String)): Tuple1[Int] = {
               // only output one value with count 1
               if (first) {
                 first = false
@@ -152,13 +139,13 @@ object PartitionProgs {
         // Verify that filter operation after repartition picks up correct
         // DOP
         val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.fromCollection(tupleInput)
+        val ds = CollectionDataSets.get3TupleDataSet(env)
         env.setDegreeOfParallelism(1)
 
         val count = ds.partitionByHash(0).setParallelism(4).filter(
-          new RichFilterFunction[(Int, String)] {
+          new RichFilterFunction[(Int, Long, String)] {
             var first = true
-            override def filter(in: (Int, String)): Boolean = {
+            override def filter(in: (Int, Long, String)): Boolean = {
               // only output one value with count 1
               if (first) {
                 first = false
@@ -175,6 +162,19 @@ object PartitionProgs {
 
         if (onCollection) "(1)\n" else "(4)\n"
 
+      case 7 =>
+        val env = ExecutionEnvironment.getExecutionEnvironment
+        env.setDegreeOfParallelism(3)
+        val ds = CollectionDataSets.getDuplicatePojoDataSet(env)
+        val uniqLongs = ds
+          .partitionByHash("nestedPojo.longNumber")
+          .setParallelism(4)
+          .mapPartition( _.map(_.nestedPojo.longNumber).toSet )
+
+        uniqLongs.writeAsText(resultPath)
+        env.execute()
+        "10000\n" + "20000\n" + "30000\n"
+
       case _ =>
         throw new IllegalArgumentException("Invalid program id")
     }
@@ -194,7 +194,7 @@ class PartitionITCase(config: Configuration) extends JavaProgramTestBase(config)
   }
 
   protected def testProgram(): Unit = {
-    expectedResult = PartitionProgs.runProgram(curProgId, resultPath, isCollectionExecution)
+    expectedResult = GroupReduceProgs.runProgram(curProgId, resultPath, isCollectionExecution)
   }
 
   protected override def postSubmit(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
index d33da41..2b2d3a9 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
@@ -44,7 +44,9 @@ class CustomType(var myField1: String, var myField2: Int) {
   }
 }
 
-class MyObject[A](var a: A)
+class MyObject[A](var a: A) {
+  def this() { this(null.asInstanceOf[A]) }
+}
 
 class TypeInformationGenTest {
 
@@ -139,7 +141,7 @@ class TypeInformationGenTest {
 
     Assert.assertFalse(ti.isBasicType)
     Assert.assertFalse(ti.isTupleType)
-    Assert.assertTrue(ti.isInstanceOf[GenericTypeInfo[_]])
+    Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
     Assert.assertEquals(ti.getTypeClass, classOf[CustomType])
   }
 
@@ -152,7 +154,7 @@ class TypeInformationGenTest {
     val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
     Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeClass)
     Assert.assertEquals(classOf[java.lang.Long], tti.getTypeAt(0).getTypeClass)
-    Assert.assertTrue(tti.getTypeAt(1).isInstanceOf[GenericTypeInfo[_]])
+    Assert.assertTrue(tti.getTypeAt(1).isInstanceOf[PojoTypeInfo[_]])
     Assert.assertEquals(classOf[CustomType], tti.getTypeAt(1).getTypeClass)
   }
 
@@ -235,7 +237,7 @@ class TypeInformationGenTest {
   def testParamertizedCustomObject(): Unit = {
     val ti = createTypeInformation[MyObject[String]]
 
-    Assert.assertTrue(ti.isInstanceOf[GenericTypeInfo[_]])
+    Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
new file mode 100644
index 0000000..60f86a0
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.util
+
+import org.apache.hadoop.io.IntWritable
+
+import org.apache.flink.api.scala._
+
+import scala.collection.mutable
+import scala.util.Random
+
+/**
+ * #################################################################################################
+ *
+ * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. 
+ * IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
+ *
+ * #################################################################################################
+ */
+object CollectionDataSets {
+  def get3TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = {
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Hi"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((3, 2L, "Hello world"))
+    data.+=((4, 3L, "Hello world, how are you?"))
+    data.+=((5, 3L, "I am fine."))
+    data.+=((6, 3L, "Luke Skywalker"))
+    data.+=((7, 4L, "Comment#1"))
+    data.+=((8, 4L, "Comment#2"))
+    data.+=((9, 4L, "Comment#3"))
+    data.+=((10, 4L, "Comment#4"))
+    data.+=((11, 5L, "Comment#5"))
+    data.+=((12, 5L, "Comment#6"))
+    data.+=((13, 5L, "Comment#7"))
+    data.+=((14, 5L, "Comment#8"))
+    data.+=((15, 5L, "Comment#9"))
+    data.+=((16, 6L, "Comment#10"))
+    data.+=((17, 6L, "Comment#11"))
+    data.+=((18, 6L, "Comment#12"))
+    data.+=((19, 6L, "Comment#13"))
+    data.+=((20, 6L, "Comment#14"))
+    data.+=((21, 6L, "Comment#15"))
+    Random.shuffle(data)
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getSmall3TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = {
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Hi"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((3, 2L, "Hello world"))
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def get5TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, Int, String, Long)]
= {
+    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
+    data.+=((1, 1L, 0, "Hallo", 1L))
+    data.+=((2, 2L, 1, "Hallo Welt", 2L))
+    data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
+    data.+=((3, 4L, 3, "Hallo Welt wie gehts?", 2L))
+    data.+=((3, 5L, 4, "ABC", 2L))
+    data.+=((3, 6L, 5, "BCD", 3L))
+    data.+=((4, 7L, 6, "CDE", 2L))
+    data.+=((4, 8L, 7, "DEF", 1L))
+    data.+=((4, 9L, 8, "EFG", 1L))
+    data.+=((4, 10L, 9, "FGH", 2L))
+    data.+=((5, 11L, 10, "GHI", 1L))
+    data.+=((5, 12L, 11, "HIJ", 3L))
+    data.+=((5, 13L, 12, "IJK", 3L))
+    data.+=((5, 14L, 13, "JKL", 2L))
+    data.+=((5, 15L, 14, "KLM", 2L))
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getSmall5TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, Int, String,
Long)] = {
+    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
+    data.+=((1, 1L, 0, "Hallo", 1L))
+    data.+=((2, 2L, 1, "Hallo Welt", 2L))
+    data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getSmallNestedTupleDataSet(env: ExecutionEnvironment): DataSet[((Int, Int), String)]
= {
+    val data = new mutable.MutableList[((Int, Int), String)]
+    data.+=(((1, 1), "one"))
+    data.+=(((2, 2), "two"))
+    data.+=(((3, 3), "three"))
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getGroupSortedNestedTupleDataSet(env: ExecutionEnvironment): DataSet[((Int, Int), String)]
= {
+    val data = new mutable.MutableList[((Int, Int), String)]
+    data.+=(((1, 3), "a"))
+    data.+=(((1, 2), "a"))
+    data.+=(((2, 1), "a"))
+    data.+=(((2, 2), "b"))
+    data.+=(((3, 3), "c"))
+    data.+=(((3, 6), "c"))
+    data.+=(((4, 9), "c"))
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getStringDataSet(env: ExecutionEnvironment): DataSet[String] = {
+    val data = new mutable.MutableList[String]
+    data.+=("Hi")
+    data.+=("Hello")
+    data.+=("Hello world")
+    data.+=("Hello world, how are you?")
+    data.+=("I am fine.")
+    data.+=("Luke Skywalker")
+    data.+=("Random comment")
+    data.+=("LOL")
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getIntDataSet(env: ExecutionEnvironment): DataSet[Int] = {
+    val data = new mutable.MutableList[Int]
+    data.+=(1)
+    data.+=(2)
+    data.+=(2)
+    data.+=(3)
+    data.+=(3)
+    data.+=(3)
+    data.+=(4)
+    data.+=(4)
+    data.+=(4)
+    data.+=(4)
+    data.+=(5)
+    data.+=(5)
+    data.+=(5)
+    data.+=(5)
+    data.+=(5)
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getCustomTypeDataSet(env: ExecutionEnvironment): DataSet[CustomType] = {
+    val data = new mutable.MutableList[CustomType]
+    data.+=(new CustomType(1, 0L, "Hi"))
+    data.+=(new CustomType(2, 1L, "Hello"))
+    data.+=(new CustomType(2, 2L, "Hello world"))
+    data.+=(new CustomType(3, 3L, "Hello world, how are you?"))
+    data.+=(new CustomType(3, 4L, "I am fine."))
+    data.+=(new CustomType(3, 5L, "Luke Skywalker"))
+    data.+=(new CustomType(4, 6L, "Comment#1"))
+    data.+=(new CustomType(4, 7L, "Comment#2"))
+    data.+=(new CustomType(4, 8L, "Comment#3"))
+    data.+=(new CustomType(4, 9L, "Comment#4"))
+    data.+=(new CustomType(5, 10L, "Comment#5"))
+    data.+=(new CustomType(5, 11L, "Comment#6"))
+    data.+=(new CustomType(5, 12L, "Comment#7"))
+    data.+=(new CustomType(5, 13L, "Comment#8"))
+    data.+=(new CustomType(5, 14L, "Comment#9"))
+    data.+=(new CustomType(6, 15L, "Comment#10"))
+    data.+=(new CustomType(6, 16L, "Comment#11"))
+    data.+=(new CustomType(6, 17L, "Comment#12"))
+    data.+=(new CustomType(6, 18L, "Comment#13"))
+    data.+=(new CustomType(6, 19L, "Comment#14"))
+    data.+=(new CustomType(6, 20L, "Comment#15"))
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getSmallCustomTypeDataSet(env: ExecutionEnvironment): DataSet[CustomType] = {
+    val data = new mutable.MutableList[CustomType]
+    data.+=(new CustomType(1, 0L, "Hi"))
+    data.+=(new CustomType(2, 1L, "Hello"))
+    data.+=(new CustomType(2, 2L, "Hello world"))
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getSmallTuplebasedPojoMatchingDataSet(env: ExecutionEnvironment):
+      DataSet[(Int, String, Int, Int, Long, String, Long)] = {
+    val data = new mutable.MutableList[(Int, String, Int, Int, Long, String, Long)]
+    data.+=((1, "First", 10, 100, 1000L, "One", 10000L))
+    data.+=((2, "Second", 20, 200, 2000L, "Two", 20000L))
+    data.+=((3, "Third", 30, 300, 3000L, "Three", 30000L))
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getSmallPojoDataSet(env: ExecutionEnvironment): DataSet[POJO] = {
+    val data = new mutable.MutableList[POJO]
+    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
+    data.+=(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L))
+    data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L))
+    env.fromCollection(Random.shuffle(data))
+  }
+
+  def getDuplicatePojoDataSet(env: ExecutionEnvironment): DataSet[POJO] = {
+    val data = new mutable.MutableList[POJO]
+    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
+    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
+    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
+    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
+    data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L))
+    data.+=(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L))
+    data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L))
+    data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L))
+    env.fromCollection(data)
+  }
+
+  def getCrazyNestedDataSet(env: ExecutionEnvironment): DataSet[CrazyNested] = {
+    val data = new mutable.MutableList[CrazyNested]
+    data.+=(new CrazyNested("aa"))
+    data.+=(new CrazyNested("bb"))
+    data.+=(new CrazyNested("bb"))
+    data.+=(new CrazyNested("cc"))
+    data.+=(new CrazyNested("cc"))
+    data.+=(new CrazyNested("cc"))
+    env.fromCollection(data)
+  }
+
+  def getPojoContainingTupleAndWritable(env: ExecutionEnvironment): DataSet[CollectionDataSets
+  .PojoContainingTupleAndWritable] = {
+    val data = new
+        mutable.MutableList[PojoContainingTupleAndWritable]
+    data.+=(new PojoContainingTupleAndWritable(1, 10L, 100L))
+    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
+    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
+    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
+    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
+    data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L))
+    env.fromCollection(data)
+  }
+
+  def getTupleContainingPojos(env: ExecutionEnvironment): DataSet[(Int, CrazyNested, POJO)]
= {
+    val data = new mutable.MutableList[(Int, CrazyNested, POJO)]
+    data.+=((
+      1,
+      new CrazyNested("one", "uno", 1L),
+      new POJO(1, "First", 10, 100, 1000L, "One", 10000L)))
+    data.+=((
+      1,
+      new CrazyNested("one", "uno", 1L),
+      new POJO(1, "First", 10, 100, 1000L, "One", 10000L)))
+    data.+=((
+      1,
+      new CrazyNested("one", "uno", 1L),
+      new POJO(1, "First", 10, 100, 1000L, "One", 10000L)))
+    data.+=((
+      2,
+      new CrazyNested("two", "duo", 2L),
+      new POJO(1, "First", 10, 100, 1000L, "One", 10000L)))
+    env.fromCollection(data)
+  }
+
+  def getPojoWithMultiplePojos(env: ExecutionEnvironment): DataSet[CollectionDataSets
+  .PojoWithMultiplePojos] = {
+    val data = new mutable.MutableList[CollectionDataSets
+    .PojoWithMultiplePojos]
+    data.+=(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1))
+    data.+=(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2))
+    data.+=(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3))
+    env.fromCollection(data)
+  }
+
+
+  class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
+    def this() {
+      this(0, 0, "")
+    }
+
+    override def toString: String = {
+      myInt + "," + myLong + "," + myString
+    }
+  }
+
+  class POJO(
+      var number: Int,
+      var str: String,
+      var nestedTupleWithCustom: (Int, CustomType),
+      var nestedPojo: NestedPojo) {
+    def this() {
+      this(0, "", null, null)
+    }
+
+    def this(i0: Int, s0: String, i1: Int, i2: Int, l0: Long, s1: String, l1: Long) {
+      this(i0, s0, (i1, new CustomType(i2, l0, s1)), new NestedPojo(l1))
+    }
+
+    override def toString: String = {
+      number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber
+    }
+
+    @transient var ignoreMe: Long = 1L
+  }
+
+  class NestedPojo(var longNumber: Long) {
+    def this() {
+      this(0)
+    }
+  }
+
+  class CrazyNested(var nest_Lvl1: CrazyNestedL1, var something: Long) {
+    def this() {
+      this(new CrazyNestedL1, 0)
+    }
+
+    def this(set: String) {
+      this()
+      nest_Lvl1 = new CrazyNestedL1
+      nest_Lvl1.nest_Lvl2 = new CrazyNestedL2
+      nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3
+      nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4
+      nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = set
+    }
+
+    def this(set: String, second: String, s: Long) {
+      this(set)
+      something = s
+      nest_Lvl1.a = second
+    }
+  }
+
+  class CrazyNestedL1 {
+    var a: String = null
+    var b: Int = 0
+    var nest_Lvl2: CrazyNestedL2 = null
+  }
+
+  class CrazyNestedL2 {
+    var nest_Lvl3: CrazyNestedL3 = null
+  }
+
+  class CrazyNestedL3 {
+    var nest_Lvl4: CrazyNestedL4 = null
+  }
+
+  class CrazyNestedL4 {
+    var f1nal: String = null
+  }
+
+  class PojoContainingTupleAndWritable(
+      var someInt: Int,
+      var someString: String,
+      var hadoopFan: IntWritable,
+      var theTuple: (Long, Long)) {
+    def this() {
+      this(0, "", new IntWritable(0), (0, 0))
+    }
+
+    def this(i: Int, l1: Long, l2: Long) {
+      this()
+      hadoopFan = new IntWritable(i)
+      someInt = i
+      theTuple = (l1, l2)
+    }
+
+  }
+
+  class Pojo1 {
+    var a: String = null
+    var b: String = null
+  }
+
+  class Pojo2 {
+    var a2: String = null
+    var b2: String = null
+  }
+
+  class PojoWithMultiplePojos {
+
+    def this(a: String, b: String, a1: String, b1: String, i0: Int) {
+      this()
+      p1 = new Pojo1
+      p1.a = a
+      p1.b = b
+      p2 = new Pojo2
+      p2.a2 = a1
+      p2.a2 = b1
+      this.i0 = i0
+    }
+
+    var p1: Pojo1 = null
+    var p2: Pojo2 = null
+    var i0: Int = 0
+  }
+
+}
+


Mime
View raw message