Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A1465119D3 for ; Mon, 8 Sep 2014 14:19:22 +0000 (UTC) Received: (qmail 76346 invoked by uid 500); 8 Sep 2014 14:19:22 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 76323 invoked by uid 500); 8 Sep 2014 14:19:22 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 76314 invoked by uid 99); 8 Sep 2014 14:19:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Sep 2014 14:19:22 +0000 X-ASF-Spam-Status: No, hits=-2001.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 08 Sep 2014 14:18:55 +0000 Received: (qmail 76166 invoked by uid 99); 8 Sep 2014 14:18:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Sep 2014 14:18:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 45B3FA0E638; Mon, 8 Sep 2014 14:18:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.incubator.apache.org Date: Mon, 08 Sep 2014 14:18:52 -0000 Message-Id: <2cfc5233dc8343fd9bd54f8ed885ceec@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: [FLINK-925] Support KeySelector function returning Tuple types X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-flink Updated Branches: refs/heads/master c0c2abda5 -> 122c9b023 [FLINK-925] Support KeySelector function returning Tuple types Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fb3bdeac Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fb3bdeac Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fb3bdeac Branch: refs/heads/master Commit: fb3bdeac0b0c73e905945a1ecdbf29bf83ba3a6e Parents: c0c2abd Author: TobiasWiens Authored: Sun Jul 6 17:47:00 2014 +0200 Committer: Fabian Hueske Committed: Mon Sep 8 16:17:32 2014 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/DataSet.java | 3 +- .../flink/api/java/operators/JoinOperator.java | 2 +- .../flink/api/java/typeutils/TupleTypeInfo.java | 17 +- .../java/typeutils/runtime/TupleComparator.java | 3 +- .../runtime/TupleComparatorTTT1Test.java | 155 +++++++++++++++++++ .../runtime/TupleComparatorTTT2Test.java | 153 ++++++++++++++++++ .../runtime/TupleComparatorTTT3Test.java | 154 ++++++++++++++++++ 7 files changed, 481 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index ca2a5e9..4688349 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -456,7 +456,8 @@ public abstract class DataSet { * @see org.apache.flink.api.java.operators.GroupReduceOperator * @see DataSet */ - public > UnsortedGrouping groupBy(KeySelector keyExtractor) { + + public UnsortedGrouping groupBy(KeySelector keyExtractor) { return new UnsortedGrouping(this, new Keys.SelectorFunctionKeys(keyExtractor, getType())); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index 2efe7e9..1ca2ec9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -756,7 +756,7 @@ public abstract class JoinOperator extends TwoInputUdfOperator> JoinOperatorSetsPredicate where(KeySelector keySelector) { + public JoinOperatorSetsPredicate where(KeySelector keySelector) { return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys(keySelector, input1.getType())); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java index 737be56..94d3252 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -120,7 +120,7 @@ public class TupleTypeInfo extends TypeInformation implement } // special case for tuples where field zero is the key field - if (logicalKeyFields.length == 1 && logicalKeyFields[0] == 0) { + if (logicalKeyFields.length == 1 && logicalKeyFields[0] == 0 && !types[0].isTupleType()) { return createLeadingFieldComparator(orders[0], types[0]); } @@ -141,8 +141,21 @@ public class TupleTypeInfo extends TypeInformation implement int keyPos = logicalKeyFields[i]; if (types[keyPos].isKeyType() && types[keyPos] instanceof AtomicType) { fieldComparators[i] = ((AtomicType) types[keyPos]).createComparator(orders[i]); + } else if(types[keyPos].isTupleType() && types[keyPos] instanceof TupleTypeInfo){ // Check for tuple + TupleTypeInfo tupleType = (TupleTypeInfo) types[keyPos]; + + // All fields are key + int[] allFieldsKey = new int[tupleType.types.length]; + for(int h = 0; h < tupleType.types.length; h++){ + allFieldsKey[h]=h; + } + + // Prepare order + boolean[] tupleOrders = new boolean[tupleType.types.length]; + Arrays.fill(tupleOrders, orders[i]); + fieldComparators[i] = tupleType.createComparator(allFieldsKey, tupleOrders); } else { - throw new IllegalArgumentException("The field at position " + i + " (" + types[keyPos] + ") is no atomic key type."); + throw new IllegalArgumentException("The field at position " + i + " (" + types[keyPos] + ") is no atomic key type nor tuple type."); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java index 48cf08b..9de9824 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java @@ -236,8 +236,7 @@ public final class TupleComparator extends TypeComparator im try { for (; i < keyPositions.length; i++) { int keyPos = keyPositions[i]; - @SuppressWarnings("unchecked") - int cmp = comparators[i].compare((T)first.getFieldNotNull(keyPos), (T)second.getFieldNotNull(keyPos)); + int cmp = comparators[i].compare(first.getFieldNotNull(keyPos), second.getFieldNotNull(keyPos)); if (cmp != 0) { return cmp; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java new file mode 100644 index 0000000..d406529 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import static org.junit.Assert.assertEquals; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringComparator; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorTTT1Test extends TupleComparatorTestBase, Tuple2, Tuple2>> { + + @SuppressWarnings("unchecked") + Tuple3, Tuple2, Tuple2>[] dataISD = new Tuple3[]{ + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 1.0), new Tuple2(1L, 1L), new Tuple2(4, -10L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 2.0), new Tuple2(1L, 2L), new Tuple2(4, -5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 3.0), new Tuple2(1L, 3L), new Tuple2(4, 0L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 3.5), new Tuple2(1L, 4L), new Tuple2(4, 5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 4325.12), new Tuple2(1L, 5L), new Tuple2(4, 15L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 1.0), new Tuple2(2L, 4L), new Tuple2(45, -5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 2.0), new Tuple2(2L, 6L), new Tuple2(45, 5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 3.0), new Tuple2(2L, 8L), new Tuple2(323, 2L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 3.5), new Tuple2(2L, 9L), new Tuple2(323, 5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 4325.12), new Tuple2(2L, 123L), new Tuple2(555, 1L)) + + }; + + @SuppressWarnings("unchecked") + @Override + protected TupleComparator, Tuple2, Tuple2>> createComparator( + boolean ascending) { + return new TupleComparator, Tuple2, Tuple2>>( + new int[] { 0 }, + new TypeComparator[] { + new TupleComparator>( + new int[] { 0, 1 }, + new TypeComparator[] { + new StringComparator(ascending), + new DoubleComparator(ascending) }, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleComparator>( + new int[] { 0, 1 }, + new TypeComparator[] { + new LongComparator(ascending), + new LongComparator(ascending) }, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleComparator>( + new int[] { 0, 1 }, + new TypeComparator[] { + new IntComparator(ascending), + new LongComparator(ascending) }, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }, + new TypeSerializer[] { + new TupleSerializer>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleSerializer>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleSerializer>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer, Tuple2, Tuple2>> createSerializer() { + return new TupleSerializer, Tuple2, Tuple2>>( + (Class, Tuple2, Tuple2>>) (Class) Tuple3.class, + new TypeSerializer[]{ + new TupleSerializer> ( + (Class>) (Class) Tuple2.class, + new TypeSerializer[]{ + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE + }), + new TupleSerializer> ( + (Class>) (Class) Tuple2.class, + new TypeSerializer[]{ + LongSerializer.INSTANCE, + LongSerializer.INSTANCE + }), + new TupleSerializer> ( + (Class>) (Class) Tuple2.class, + new TypeSerializer[]{ + IntSerializer.INSTANCE, + LongSerializer.INSTANCE + }) + }); + } + + @Override + protected Tuple3, Tuple2, Tuple2>[] getSortedTestData() { + return this.dataISD; + } + + @Override + protected void deepEquals( + String message, + Tuple3, Tuple2, Tuple2> should, + Tuple3, Tuple2, Tuple2> is) { + + for (int x = 0; x < should.getArity(); x++) { + // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields. + if(should.getField(x) instanceof Tuple2) { + this.deepEquals(message, (Tuple2) should.getField(x), (Tuple2)is.getField(x)); + } + else { + assertEquals(message, should.getField(x), is.getField(x)); + } + }// For + } + + protected void deepEquals(String message, Tuple2 should, Tuple2 is) { + for (int x = 0; x < should.getArity(); x++) { + assertEquals(message, should.getField(x), is.getField(x)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java new file mode 100644 index 0000000..11f3a5e --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringComparator; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorTTT2Test extends TupleComparatorTestBase, Tuple2, Tuple2>> { + + @SuppressWarnings("unchecked") + Tuple3, Tuple2, Tuple2>[] dataISD = new Tuple3[]{ + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 1.0), new Tuple2(1L, 1L), new Tuple2(4, -10L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 2.0), new Tuple2(1L, 2L), new Tuple2(4, -5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 3.0), new Tuple2(1L, 3L), new Tuple2(4, 0L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 3.5), new Tuple2(1L, 4L), new Tuple2(4, 5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 4325.12), new Tuple2(1L, 5L), new Tuple2(4, 15L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 1.0), new Tuple2(2L, 4L), new Tuple2(45, -5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 2.0), new Tuple2(2L, 6L), new Tuple2(45, 5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 3.0), new Tuple2(2L, 8L), new Tuple2(323, 2L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 3.5), new Tuple2(2L, 9L), new Tuple2(323, 5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 4325.12), new Tuple2(2L, 123L), new Tuple2(555, 1L)) + + }; + + @SuppressWarnings("unchecked") + @Override + protected TupleComparator, Tuple2, Tuple2>> createComparator( + boolean ascending) { + return new TupleComparator, Tuple2, Tuple2>>( + new int[] { 0, 2 }, + new TypeComparator[] { + new TupleComparator>( + new int[] { 0, 1 }, + new TypeComparator[] { + new StringComparator(ascending), + new DoubleComparator(ascending) }, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleComparator>( + new int[] { 0, 1 }, + new TypeComparator[] { + new LongComparator(ascending), + new LongComparator(ascending) }, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleComparator>( + new int[] { 0, 1 }, + new TypeComparator[] { + new IntComparator(ascending), + new LongComparator(ascending) }, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }, + new TypeSerializer[] { + new TupleSerializer>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleSerializer>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleSerializer>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer, Tuple2, Tuple2>> createSerializer() { + return new TupleSerializer, Tuple2, Tuple2>>( + (Class, Tuple2, Tuple2>>) (Class) Tuple3.class, + new TypeSerializer[]{ + new TupleSerializer> ( + (Class>) (Class) Tuple2.class, + new TypeSerializer[]{ + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE}), + new TupleSerializer> ( + (Class>) (Class) Tuple2.class, + new TypeSerializer[]{ + LongSerializer.INSTANCE, + LongSerializer.INSTANCE}), + new TupleSerializer> ( + (Class>) (Class) Tuple2.class, + new TypeSerializer[]{ + IntSerializer.INSTANCE, + LongSerializer.INSTANCE}) + }); + } + + @Override + protected Tuple3, Tuple2, Tuple2>[] getSortedTestData() { + return this.dataISD; + } + + @Override + protected void deepEquals( + String message, + Tuple3, Tuple2, Tuple2> should, + Tuple3, Tuple2, Tuple2> is) { + + for (int x = 0; x < should.getArity(); x++) { + // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields. + if(should.getField(x) instanceof Tuple2) { + this.deepEquals(message, (Tuple2) should.getField(x), (Tuple2)is.getField(x)); + } + else { + assertEquals(message, should.getField(x), is.getField(x)); + } + }// For + } + + protected void deepEquals(String message, Tuple2 should, Tuple2 is) { + for (int x = 0; x < should.getArity(); x++) { + assertEquals(message, should.getField(x), is.getField(x)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java new file mode 100644 index 0000000..1339bca --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import static org.junit.Assert.assertEquals; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleComparator; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringComparator; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; + +public class TupleComparatorTTT3Test extends TupleComparatorTestBase, Tuple2, Tuple2>>{ + @SuppressWarnings("unchecked") + Tuple3, Tuple2, Tuple2>[] dataISD = new Tuple3[]{ + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 1.0), new Tuple2(1L, 1L), new Tuple2(4, -10L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 2.0), new Tuple2(1L, 2L), new Tuple2(4, -5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 3.0), new Tuple2(1L, 3L), new Tuple2(4, 0L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 3.5), new Tuple2(1L, 4L), new Tuple2(4, 5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("hello", 4325.12), new Tuple2(1L, 5L), new Tuple2(4, 15L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 1.0), new Tuple2(2L, 4L), new Tuple2(45, -5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 2.0), new Tuple2(2L, 6L), new Tuple2(45, 5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 3.0), new Tuple2(2L, 8L), new Tuple2(323, 2L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 3.5), new Tuple2(2L, 9L), new Tuple2(323, 5L)), + new Tuple3, Tuple2, Tuple2>(new Tuple2("world", 4325.12), new Tuple2(2L, 123L), new Tuple2(555, 1L)) + + }; + + @SuppressWarnings("unchecked") + @Override + protected TupleComparator, Tuple2, Tuple2>> createComparator( + boolean ascending) { + return new TupleComparator, Tuple2, Tuple2>>( + new int[] { 0, 1, 2 }, + new TypeComparator[] { + new TupleComparator>( + new int[] { 0, 1 }, + new TypeComparator[] { + new StringComparator(ascending), + new DoubleComparator(ascending) }, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleComparator>( + new int[] { 0, 1 }, + new TypeComparator[] { + new LongComparator(ascending), + new LongComparator(ascending) }, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleComparator>( + new int[] { 0, 1 }, + new TypeComparator[] { + new IntComparator(ascending), + new LongComparator(ascending) }, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }, + new TypeSerializer[] { + new TupleSerializer>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE }), + new TupleSerializer>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { + LongSerializer.INSTANCE, + LongSerializer.INSTANCE }), + new TupleSerializer>( + (Class>) (Class) Tuple2.class, + new TypeSerializer[] { + IntSerializer.INSTANCE, + LongSerializer.INSTANCE }) }); + } + + @SuppressWarnings("unchecked") + @Override + protected TupleSerializer, Tuple2, Tuple2>> createSerializer() { + return new TupleSerializer, Tuple2, Tuple2>>( + (Class, Tuple2, Tuple2>>) (Class) Tuple3.class, + new TypeSerializer[]{ + new TupleSerializer> ( + (Class>) (Class) Tuple2.class, + new TypeSerializer[]{ + StringSerializer.INSTANCE, + DoubleSerializer.INSTANCE + }), + new TupleSerializer> ( + (Class>) (Class) Tuple2.class, + new TypeSerializer[]{ + LongSerializer.INSTANCE, + LongSerializer.INSTANCE + }), + new TupleSerializer> ( + (Class>) (Class) Tuple2.class, + new TypeSerializer[]{ + IntSerializer.INSTANCE, + LongSerializer.INSTANCE + }) + }); + } + + @Override + protected Tuple3, Tuple2, Tuple2>[] getSortedTestData() { + return this.dataISD; + } + + @Override + protected void deepEquals( + String message, + Tuple3, Tuple2, Tuple2> should, + Tuple3, Tuple2, Tuple2> is) { + + for (int x = 0; x < should.getArity(); x++) { + // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields. + if(should.getField(x) instanceof Tuple2) { + this.deepEquals(message, (Tuple2) should.getField(x), (Tuple2)is.getField(x)); + } + else { + assertEquals(message, should.getField(x), is.getField(x)); + } + }// For + } + + protected void deepEquals(String message, Tuple2 should, Tuple2 is) { + for (int x = 0; x < should.getArity(); x++) { + assertEquals(message, should.getField(x), is.getField(x)); + } + } +}