Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4440E17940 for ; Mon, 13 Oct 2014 16:11:52 +0000 (UTC) Received: (qmail 11612 invoked by uid 500); 13 Oct 2014 16:11:52 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 11588 invoked by uid 500); 13 Oct 2014 16:11:52 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 11579 invoked by uid 99); 13 Oct 2014 16:11:52 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Oct 2014 16:11:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 13 Oct 2014 16:11:27 +0000 Received: (qmail 8154 invoked by uid 99); 13 Oct 2014 16:11:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Oct 2014 16:11:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E3D9C90DA11; Mon, 13 Oct 2014 16:11:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.incubator.apache.org Date: Mon, 13 Oct 2014 16:11:27 -0000 Message-Id: <299cabaafc2a486ea6d179a587c12c4f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/5] git commit: Enable forgotten JoinITCase (for POJOs) for Scala and Java API X-Virus-Checked: Checked by ClamAV on apache.org Enable forgotten JoinITCase (for POJOs) for Scala and Java API Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fd31185e Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fd31185e Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fd31185e Branch: refs/heads/release-0.7 Commit: fd31185e284b1404443553c64befc94efcfdbb75 Parents: aa62ad1 Author: Robert Metzger Authored: Sun Oct 12 10:32:48 2014 +0200 Committer: Robert Metzger Committed: Mon Oct 13 18:10:32 2014 +0200 ---------------------------------------------------------------------- .../java/typeutils/runtime/PojoComparator.java | 3 +-- .../java/typeutils/runtime/TupleComparator.java | 1 - .../flink/api/scala/operators/JoinITCase.scala | 10 ++++----- .../api/scala/util/CollectionDataSets.scala | 9 ++++++++ .../test/javaApiOperators/CoGroupITCase.java | 6 +++--- .../flink/test/javaApiOperators/JoinITCase.java | 22 ++++++++++---------- .../util/CollectionDataSets.java | 22 ++++++++++++++++---- 7 files changed, 47 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd31185e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java index 2cccfcf..7c15ecd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java @@ -199,7 +199,7 @@ public final class PojoComparator extends CompositeTypeComparator implemen throw new NullKeyFieldException("Unable to access field "+field+" on object "+object); } catch (IllegalAccessException iaex) { throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo." - + " fiels: " + field + " obj: " + object); + + " fields: " + field + " obj: " + object); } return object; } @@ -211,7 +211,6 @@ public final class PojoComparator extends CompositeTypeComparator implemen for (; i < this.keyFields.length; i++) { code *= TupleComparatorBase.HASH_SALT[i & 0x1F]; code += this.comparators[i].hash(accessField(keyFields[i], value)); - } return code; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd31185e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java index 89b7794..875ecc2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java @@ -49,7 +49,6 @@ public final class TupleComparator extends TupleComparatorBase< int i = 0; try { int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0])); - for (i = 1; i < this.keyPositions.length; i++) { code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i])); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd31185e/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala index 4a2355c..2605830 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala @@ -34,7 +34,7 @@ import org.apache.flink.api.scala._ object JoinProgs { - var NUM_PROGRAMS: Int = 19 + var NUM_PROGRAMS: Int = 20 def runProgram(progId: Int, resultPath: String): String = { progId match { @@ -324,14 +324,14 @@ object JoinProgs { */ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds1 = CollectionDataSets.getSmallPojoDataSet(env) - val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env) + val ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env) val joinDs = ds1.join(ds2).where("*").equalTo("*") joinDs.writeAsCsv(resultPath) env.setDegreeOfParallelism(1) env.execute() - "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + "2 Second (20,200," + - "2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + "3 Third (30,300,3000," + - "Three) 30000,(3,Third,30,300,3000,Three,30000)\n" + "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n" + + "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n" + + "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n" case _ => throw new IllegalArgumentException("Invalid program id: " + progId) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd31185e/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala index ace195a..09e049b 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala @@ -271,6 +271,15 @@ object CollectionDataSets { env.fromCollection(data) } + def getSmallTuplebasedDataSetMatchingPojo(env: ExecutionEnvironment): + DataSet[(Long, Integer, Integer, Long, String, Integer, String)] = { + val data = new mutable.MutableList[(Long, Integer, Integer, Long, String, Integer, String)] + data.+=((10000L, 10, 100, 1000L, "One", 1, "First")) + data.+=((20000L, 20, 200, 2000L, "Two", 2, "Second")) + data.+=((30000L, 30, 300, 3000L, "Three", 3, "Third")) + env.fromCollection(data) + } + def getPojoWithMultiplePojos(env: ExecutionEnvironment): DataSet[CollectionDataSets .PojoWithMultiplePojos] = { val data = new mutable.MutableList[CollectionDataSets http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd31185e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java index f0e89df..ffc208c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java @@ -390,7 +390,7 @@ public class CoGroupITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet coGroupDs = ds.coGroup(ds2) .where("nestedPojo.longNumber").equalTo(6).with(new CoGroupFunction, CustomType>() { private static final long serialVersionUID = 1L; @@ -425,7 +425,7 @@ public class CoGroupITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet coGroupDs = ds.coGroup(ds2) .where(new KeySelector>() { private static final long serialVersionUID = 1L; @@ -468,7 +468,7 @@ public class CoGroupITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet coGroupDs = ds.coGroup(ds2) .where(new KeySelector() { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd31185e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java index e8d8be9..bfae922 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java @@ -48,7 +48,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; @RunWith(Parameterized.class) public class JoinITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 21; + private static int NUM_PROGRAMS = 22; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -505,7 +505,7 @@ public class JoinITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6"); @@ -525,7 +525,7 @@ public class JoinITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference! @@ -544,7 +544,7 @@ public class JoinITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1"); @@ -565,7 +565,7 @@ public class JoinITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2"); @@ -586,7 +586,7 @@ public class JoinITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4"); @@ -649,8 +649,8 @@ public class JoinITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); - DataSet >> joinDs = + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env); + DataSet >> joinDs = ds1.join(ds2).where("*").equalTo("*"); joinDs.writeAsCsv(resultPath); @@ -658,9 +658,9 @@ public class JoinITCase extends JavaProgramTestBase { env.execute(); // return expected result - return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + - "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + - "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; + return "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+ + "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+ + "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n"; } default: throw new IllegalArgumentException("Invalid program id: "+progId); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd31185e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index 6757f66..0f8097a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -286,17 +287,32 @@ public class CollectionDataSets { } } - public static DataSet> getSmallTuplebasedPojoMatchingDataSet(ExecutionEnvironment env) { + public static DataSet> getSmallTuplebasedDataSet(ExecutionEnvironment env) { List> data = new ArrayList>(); data.add(new Tuple7(1, "First", 10, 100, 1000L, "One", 10000L)); data.add(new Tuple7(2, "Second", 20, 200, 2000L, "Two", 20000L)); data.add(new Tuple7(3, "Third", 30, 300, 3000L, "Three", 30000L)); return env.fromCollection(data); } + + public static DataSet> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) { + List> data = + new ArrayList>(); + data.add(new Tuple7 + (10000L, 10, 100, 1000L, "One", 1, "First")); + + data.add(new Tuple7 + (20000L, 20, 200, 2000L, "Two", 2, "Second")); + + data.add(new Tuple7 + (30000L, 30, 300, 3000L, "Three", 3, "Third")); + + return env.fromCollection(data); + } public static DataSet getSmallPojoDataSet(ExecutionEnvironment env) { List data = new ArrayList(); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); + data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/)); data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)); data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); return env.fromCollection(data); @@ -320,7 +336,6 @@ public class CollectionDataSets { public String str; public Tuple2 nestedTupleWithCustom; public NestedPojo nestedPojo; - public Date date; public transient Long ignoreMe; public POJO(int i0, String s0, @@ -330,7 +345,6 @@ public class CollectionDataSets { this.str = s0; this.nestedTupleWithCustom = new Tuple2(i1, new CustomType(i2, l0, s1)); this.nestedPojo = new NestedPojo(); - this.date = new Date(); this.nestedPojo.longNumber = l1; }