flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject git commit: Enable forgotten JoinITCase (for POJOs) for Scala and Java API
Date Mon, 13 Oct 2014 14:43:41 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master ff2191747 -> 480f4930e


Enable forgotten JoinITCase (for POJOs) for Scala and Java API


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/480f4930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/480f4930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/480f4930

Branch: refs/heads/master
Commit: 480f4930ef3c4e3a0a18256d7f54003263c865d3
Parents: ff21917
Author: Robert Metzger <metzgerr@web.de>
Authored: Sun Oct 12 10:32:48 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Mon Oct 13 16:43:16 2014 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/PojoComparator.java  |  3 +--
 .../java/typeutils/runtime/TupleComparator.java |  1 -
 .../flink/api/scala/operators/JoinITCase.scala  | 10 ++++-----
 .../api/scala/util/CollectionDataSets.scala     |  9 ++++++++
 .../test/javaApiOperators/CoGroupITCase.java    |  6 +++---
 .../flink/test/javaApiOperators/JoinITCase.java | 22 ++++++++++----------
 .../util/CollectionDataSets.java                | 22 ++++++++++++++++----
 7 files changed, 47 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/480f4930/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index 2cccfcf..7c15ecd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -199,7 +199,7 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T>
implemen
 			throw new NullKeyFieldException("Unable to access field "+field+" on object "+object);
 		} catch (IllegalAccessException iaex) {
 			throw new RuntimeException("This should not happen since we call setAccesssible(true)
in PojoTypeInfo."
-			+ " fiels: " + field + " obj: " + object);
+			+ " fields: " + field + " obj: " + object);
 		}
 		return object;
 	}
@@ -211,7 +211,6 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T>
implemen
 		for (; i < this.keyFields.length; i++) {
 			code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
 			code += this.comparators[i].hash(accessField(keyFields[i], value));
-			
 		}
 		return code;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/480f4930/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index 89b7794..875ecc2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -49,7 +49,6 @@ public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<
 		int i = 0;
 		try {
 			int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0]));
-			
 			for (i = 1; i < this.keyPositions.length; i++) {
 				code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
 				code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i]));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/480f4930/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
index 4a2355c..2605830 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
@@ -34,7 +34,7 @@ import org.apache.flink.api.scala._
 
 
 object JoinProgs {
-  var NUM_PROGRAMS: Int = 19
+  var NUM_PROGRAMS: Int = 20
 
   def runProgram(progId: Int, resultPath: String): String = {
     progId match {
@@ -324,14 +324,14 @@ object JoinProgs {
          */
         val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
         val ds1 = CollectionDataSets.getSmallPojoDataSet(env)
-        val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+        val ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env)
         val joinDs = ds1.join(ds2).where("*").equalTo("*")
         joinDs.writeAsCsv(resultPath)
         env.setDegreeOfParallelism(1)
         env.execute()
-        "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + "2 Second (20,200,"
+
-          "2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + "3 Third (30,300,3000,"
+
-          "Three) 30000,(3,Third,30,300,3000,Three,30000)\n"
+        "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n" +
+        "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n" +
+        "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n"
       
       case _ =>
         throw new IllegalArgumentException("Invalid program id: " + progId)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/480f4930/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
index ace195a..09e049b 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
@@ -271,6 +271,15 @@ object CollectionDataSets {
     env.fromCollection(data)
   }
 
+  def getSmallTuplebasedDataSetMatchingPojo(env: ExecutionEnvironment):
+    DataSet[(Long, Integer, Integer, Long, String, Integer, String)] = {
+    val data = new mutable.MutableList[(Long, Integer, Integer, Long, String, Integer, String)]
+    data.+=((10000L, 10, 100, 1000L, "One", 1, "First"))
+    data.+=((20000L, 20, 200, 2000L, "Two", 2, "Second"))
+    data.+=((30000L, 30, 300, 3000L, "Three", 3, "Third"))
+    env.fromCollection(data)
+  }
+
   def getPojoWithMultiplePojos(env: ExecutionEnvironment): DataSet[CollectionDataSets
   .PojoWithMultiplePojos] = {
     val data = new mutable.MutableList[CollectionDataSets

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/480f4930/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index f0e89df..ffc208c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -390,7 +390,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 				DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
 						.where("nestedPojo.longNumber").equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer,
String, Integer, Integer, Long, String, Long>, CustomType>() {
 						private static final long serialVersionUID = 1L;
@@ -425,7 +425,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 				DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
 						.where(new KeySelector<POJO, Tuple1<Long>>() {
 							private static final long serialVersionUID = 1L;
@@ -468,7 +468,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 				DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
 						.where(new KeySelector<POJO, Long>() {
 							private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/480f4930/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index e8d8be9..bfae922 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -48,7 +48,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class JoinITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 21;
+	private static int NUM_PROGRAMS = 22;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -505,7 +505,7 @@ public class JoinITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String,
Long> >> joinDs = 
 						ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6");
 				
@@ -525,7 +525,7 @@ public class JoinITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String,
Long> >> joinDs = 
 						ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference!
 				
@@ -544,7 +544,7 @@ public class JoinITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String,
Long> >> joinDs = 
 						ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1");
 				
@@ -565,7 +565,7 @@ public class JoinITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String,
Long> >> joinDs = 
 						ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2");
 				
@@ -586,7 +586,7 @@ public class JoinITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
+				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedDataSet(env);
 				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String,
Long> >> joinDs = 
 						ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4");
 				
@@ -649,8 +649,8 @@ public class JoinITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-				DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2
= CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env);
-				DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String,
Long> >> joinDs = 
+				DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> ds2
= CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env);
+				DataSet<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String>
>> joinDs = 
 						ds1.join(ds2).where("*").equalTo("*");
 				
 				joinDs.writeAsCsv(resultPath);
@@ -658,9 +658,9 @@ public class JoinITCase extends JavaProgramTestBase {
 				env.execute();
 				
 				// return expected result
-				return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-					   "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-					   "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+				return "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+
+						"2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+
+						"3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n";
 			}
 			default: 
 				throw new IllegalArgumentException("Invalid program id: "+progId);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/480f4930/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 6757f66..0f8097a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -286,17 +287,32 @@ public class CollectionDataSets {
 		}
 	}
 
-	public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>>
getSmallTuplebasedPojoMatchingDataSet(ExecutionEnvironment env) {
+	public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>>
getSmallTuplebasedDataSet(ExecutionEnvironment env) {
 		List<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> data =
new ArrayList<Tuple7<Integer, String, Integer, Integer, Long, String, Long>>();
 		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(1, "First",
10, 100, 1000L, "One", 10000L));
 		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(2, "Second",
20, 200, 2000L, "Two", 20000L));
 		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(3, "Third",
30, 300, 3000L, "Three", 30000L));
 		return env.fromCollection(data);
 	}
+	
+	public static DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>>
getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
+		List<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> data =

+				new ArrayList<Tuple7<Long, Integer, Integer, Long, String, Integer, String>>();
+		data.add(new Tuple7<Long, Integer, Integer, Long, String, Integer, String>
+				(10000L, 10, 100, 1000L, "One", 1, "First"));
+		
+		data.add(new Tuple7<Long, Integer, Integer, Long, String, Integer, String>
+		(20000L, 20, 200, 2000L, "Two", 2, "Second"));
+		
+		data.add(new Tuple7<Long, Integer, Integer, Long, String, Integer, String>
+		(30000L, 30, 300, 3000L, "Three", 3, "Third"));
+		
+		return env.fromCollection(data);
+	}
 
 	public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) {
 		List<POJO> data = new ArrayList<POJO>();
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/,
"One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/));
 		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
 		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
 		return env.fromCollection(data);
@@ -320,7 +336,6 @@ public class CollectionDataSets {
 		public String str;
 		public Tuple2<Integer, CustomType> nestedTupleWithCustom;
 		public NestedPojo nestedPojo;
-		public Date date;
 		public transient Long ignoreMe;
 
 		public POJO(int i0, String s0,
@@ -330,7 +345,6 @@ public class CollectionDataSets {
 			this.str = s0;
 			this.nestedTupleWithCustom = new Tuple2<Integer, CustomType>(i1, new CustomType(i2,
l0, s1));
 			this.nestedPojo = new NestedPojo();
-			this.date = new Date();
 			this.nestedPojo.longNumber = l1;
 		}
 


Mime
View raw message