Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 269D211D29 for ; Tue, 2 Sep 2014 13:01:04 +0000 (UTC) Received: (qmail 6997 invoked by uid 500); 2 Sep 2014 13:01:03 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 6971 invoked by uid 500); 2 Sep 2014 13:01:03 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 6962 invoked by uid 99); 2 Sep 2014 13:01:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Sep 2014 13:01:03 +0000 X-ASF-Spam-Status: No, hits=-2001.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 02 Sep 2014 13:00:35 +0000 Received: (qmail 2405 invoked by uid 99); 2 Sep 2014 13:00:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Sep 2014 13:00:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5E1A29AF28E; Tue, 2 Sep 2014 13:00:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Message-Id: <1e282f82720b4374b60083a9eb8e93d6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: [FLINK-1020] Introduce minBy and maxBy Date: Tue, 2 Sep 2014 13:00:31 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-flink Updated Branches: refs/heads/master 4d4151d53 -> d60a3169f [FLINK-1020] Introduce minBy and maxBy This closes #101 Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d60a3169 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d60a3169 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d60a3169 Branch: refs/heads/master Commit: d60a3169fec671f9176f29c5a9a5c66142d1425f Parents: 4d4151d Author: TobiasWiens Authored: Fri Aug 22 12:10:37 2014 +0200 Committer: Stephan Ewen Committed: Tue Sep 2 12:59:45 2014 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/DataSet.java | 46 ++++ .../api/java/functions/SelectByMaxFunction.java | 91 ++++++++ .../api/java/functions/SelectByMinFunction.java | 91 ++++++++ .../api/java/operators/UnsortedGrouping.java | 47 +++- .../java/functions/SelectByFunctionsTest.java | 184 +++++++++++++++ .../api/java/operator/MaxByOperatorTest.java | 229 +++++++++++++++++++ .../api/java/operator/MinByOperatorTest.java | 229 +++++++++++++++++++ 7 files changed, 915 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index d25e64b..ca2a5e9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java; import org.apache.commons.lang3.Validate; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; @@ -31,6 +32,8 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.functions.FormattingMapper; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.functions.SelectByMaxFunction; +import org.apache.flink.api.java.functions.SelectByMinFunction; import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; import org.apache.flink.api.java.io.CsvOutputFormat; import org.apache.flink.api.java.io.PrintingOutputFormat; @@ -59,6 +62,7 @@ import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.types.TypeInformation; @@ -339,6 +343,48 @@ public abstract class DataSet { return new GroupReduceOperator(this, reducer); } +/** + * Applies a special case of a reduce transformation (minBy) on a non-grouped {@link DataSet}.
+ * The transformation consecutively calls a {@link ReduceFunction} + * until only a single element remains which is the result of the transformation. + * A ReduceFunction combines two elements into one new element of the same type. + * + * @param fields Keys taken into account for finding the minimum. + * @return A {@link ReduceOperator} representing the minimum. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public ReduceOperator minBy(int... fields) { + + // Check for using a tuple + if(!this.type.isTupleType()) { + throw new InvalidProgramException("Method minBy(int) only works on tuples."); + } + + return new ReduceOperator(this, new SelectByMinFunction( + (TupleTypeInfo) this.type, fields)); + } + + /** + * Applies a special case of a reduce transformation (maxBy) on a non-grouped {@link DataSet}.
+ * The transformation consecutively calls a {@link ReduceFunction} + * until only a single element remains which is the result of the transformation. + * A ReduceFunction combines two elements into one new element of the same type. + * + * @param fields Keys taken into account for finding the minimum. + * @return A {@link ReduceOperator} representing the minimum. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public ReduceOperator maxBy(int... fields) { + + // Check for using a tuple + if(!this.type.isTupleType()) { + throw new InvalidProgramException("Method maxBy(int) only works on tuples."); + } + + return new ReduceOperator(this, new SelectByMaxFunction( + (TupleTypeInfo) this.type, fields)); + } + // -------------------------------------------------------------------------------------------- // distinct // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java new file mode 100644 index 0000000..614676e --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; + + +public class SelectByMaxFunction implements ReduceFunction { + private static final long serialVersionUID = 1L; + + // Fields which are used as KEYS + private int[] fields; + + /** + * Constructor which is overwriting the default constructor. + * @param type Types of tuple whether to check if given fields are key types. + * @param fields Array of integers which are used as key for comparison. The order of indexes + * is regarded in the reduce function. First index has highest priority and last index has + * least priority. + */ + public SelectByMaxFunction(TupleTypeInfo type, int... fields) { + this.fields = fields; + + // Check correctness of each position + for (int field : fields) { + // Is field inside array + if (field < 0 || field >= type.getArity()) { + throw new IndexOutOfBoundsException( + "MinReduceFunction field position " + field + " is out of range."); + } + + // Check whether type is comparable + if (!type.getTypeAt(field).isKeyType()) { + throw new java.lang.IllegalArgumentException( + "MinReduceFunction supports only key(Comparable) types."); + } + + } + } + + /** + * Reduce implementation, returns bigger tuple or value1 if both tuples are + * equal. Comparison highly depends on the order and amount of fields chosen + * as indices. All given fields (at construction time) are checked in the same + * order as defined (at construction time). If both tuples are equal in one + * index, the next index is compared. Or if no next index is available value1 + * is returned. + * The tuple which has a bigger value at one index will be returned. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public T reduce(T value1, T value2) throws Exception { + + for (int index = 0; index < fields.length; index++) { + // Save position of compared key + int position = this.fields[index]; + + // Get both values - both implement comparable + Comparable comparable1 = value1.getField(position); + Comparable comparable2 = value2.getField(position); + + // Compare values + int comp = comparable1.compareTo(comparable2); + // If comp is bigger than 0 comparable 1 is bigger. + // Return the smaller value. + if (comp > 0) { + return value1; + } else if (comp < 0) { + return value2; + } + } + return value1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java new file mode 100644 index 0000000..4b0a7bf --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; + + +public class SelectByMinFunction implements ReduceFunction { + private static final long serialVersionUID = 1L; + + // Fields which are used as KEYS + private int[] fields; + + /** + * Constructor which is overwriting the default constructor. + * @param type Types of tuple whether to check if given fields are key types. + * @param fields Array of integers which are used as key for comparison. The order of indexes + * is regarded in the reduce function. First index has highest priority and last index has + * least priority. + */ + public SelectByMinFunction(TupleTypeInfo type, int... fields) { + this.fields = fields; + + // Check correctness of each position + for (int field : fields) { + // Is field inside array + if (field < 0 || field >= type.getArity()) { + throw new java.lang.IndexOutOfBoundsException( + "MinReduceFunction field position " + field + " is out of range."); + } + + // Check whether type is comparable + if (!type.getTypeAt(field).isKeyType()) { + throw new java.lang.IllegalArgumentException( + "MinReduceFunction supports only key(Comparable) types."); + } + + } + } + + /** + * Reduce implementation, returns smaller tuple or value1 if both tuples are + * equal. Comparison highly depends on the order and amount of fields chosen + * as indices. All given fields (at construction time) are checked in the same + * order as defined (at construction time). If both tuples are equal in one + * index, the next index is compared. Or if no next index is available value1 + * is returned. + * The tuple which has a smaller value at one index will be returned. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public T reduce(T value1, T value2) throws Exception { + + for (int index = 0; index < fields.length; index++) { + // Save position of compared key + int position = this.fields[index]; + + // Get both values - both implement comparable + Comparable comparable1 = value1.getField(position); + Comparable comparable2 = value2.getField(position); + + // Compare values + int comp = comparable1.compareTo(comparable2); + // If comp is smaller than 0 comparable 1 is smaller. + // Return the smaller value. + if (comp < 0) { + return value1; + } else if (comp > 0) { + return value2; + } + } + return value1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 87b1454..702f149 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -18,14 +18,17 @@ package org.apache.flink.api.java.operators; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.aggregation.Aggregations; - import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.SelectByMaxFunction; +import org.apache.flink.api.java.functions.SelectByMinFunction; import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; public class UnsortedGrouping extends Grouping { @@ -133,7 +136,47 @@ public class UnsortedGrouping extends Grouping { return new GroupReduceOperator(this, reducer); } - + /** + * Applies a special case of a reduce transformation (minBy) on a grouped {@link DataSet}.
+ * The transformation consecutively calls a {@link ReduceFunction} + * until only a single element remains which is the result of the transformation. + * A ReduceFunction combines two elements into one new element of the same type. + * + * @param fields Keys taken into account for finding the minimum. + * @return A {@link ReduceOperator} representing the minimum. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public ReduceOperator minBy(int... fields) { + + // Check for using a tuple + if(!this.dataSet.getType().isTupleType()) { + throw new InvalidProgramException("Method minBy(int) only works on tuples."); + } + + return new ReduceOperator(this, new SelectByMinFunction( + (TupleTypeInfo) this.dataSet.getType(), fields)); + } + + /** + * Applies a special case of a reduce transformation (maxBy) on a grouped {@link DataSet}.
+ * The transformation consecutively calls a {@link ReduceFunction} + * until only a single element remains which is the result of the transformation. + * A ReduceFunction combines two elements into one new element of the same type. + * + * @param fields Keys taken into account for finding the minimum. + * @return A {@link ReduceOperator} representing the minimum. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public ReduceOperator maxBy(int... fields) { + + // Check for using a tuple + if(!this.dataSet.getType().isTupleType()) { + throw new InvalidProgramException("Method maxBy(int) only works on tuples."); + } + + return new ReduceOperator(this, new SelectByMaxFunction( + (TupleTypeInfo) this.dataSet.getType(), fields)); + } // -------------------------------------------------------------------------------------------- // Group Operations // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java new file mode 100644 index 0000000..8bd9b04 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.BasicTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.junit.Assert; +import org.junit.Test; + +public class SelectByFunctionsTest { + + private final TupleTypeInfo> tupleTypeInfo = new TupleTypeInfo>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + private final Tuple5 bigger = new Tuple5(10, 100L, "HelloWorld", 200L, 20); + private final Tuple5 smaller = new Tuple5(5, 50L, "Hello", 50L, 15); + + //Special case where only the last value determines if bigger or smaller + private final Tuple5 specialCaseBigger = new Tuple5(10, 100L, "HelloWorld", 200L, 17); + private final Tuple5 specialCaseSmaller = new Tuple5(5, 50L, "Hello", 50L, 17); + + + /** + * This test validates whether the order of tuples has any impact on the outcome and if the bigger tuple is returned. + */ + @Test + public void testMaxByComparison() { + SelectByMaxFunction> maxByTuple = new SelectByMaxFunction>(tupleTypeInfo, new int[] {0}); + + try { + Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(smaller, bigger)); + Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, smaller)); + } catch (Exception e) { + Assert.fail("No exception should be thrown while comapring both tuples"); + } + } + + // ----------------------- MAXIMUM FUNCTION TEST BELOW -------------------------- + + /** + * This test cases checks when two tuples only differ in one value, but this value is not + * in the fields list. In that case it should be seen as equal and then the first given tuple (value1) should be returned by reduce(). + */ + @Test + public void testMaxByComparisonSpecialCase1() { + SelectByMaxFunction> maxByTuple = new SelectByMaxFunction>(tupleTypeInfo, new int[] {0,3}); + + try { + Assert.assertSame("SelectByMax must return the first given tuple", specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger)); + Assert.assertSame("SelectByMax must return the first given tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger)); + } catch (Exception e) { + Assert.fail("No exception should be thrown while comapring both tuples"); + } + } + + /** + * This test cases checks when two tuples only differ in one value. + */ + @Test + public void testMaxByComparisonSpecialCase2() { + SelectByMaxFunction> maxByTuple = new SelectByMaxFunction>(tupleTypeInfo, new int[] {0,2,1,4,3}); + + try { + Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(specialCaseBigger, bigger)); + Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger)); + } catch (Exception e) { + Assert.fail("No exception should be thrown while comapring both tuples"); + } + } + + /** + * This test validates that equality is independent of the amount of used indices. + */ + @Test + public void testMaxByComparisonMultiple() { + SelectByMaxFunction> maxByTuple = new SelectByMaxFunction>(tupleTypeInfo, new int[] {0,1,2,3,4}); + + try { + Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(smaller, bigger)); + Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, smaller)); + } catch (Exception e) { + Assert.fail("No exception should be thrown while comapring both tuples"); + } + } + + /** + * Checks whether reduce does behave as expected if both values are the same object. + */ + @Test + public void testMaxByComparisonMustReturnATuple() { + SelectByMaxFunction> maxByTuple = new SelectByMaxFunction>(tupleTypeInfo, new int[] {0}); + + try { + Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, bigger)); + Assert.assertSame("SelectByMax must return smaller tuple", smaller, maxByTuple.reduce(smaller, smaller)); + } catch (Exception e) { + Assert.fail("No exception should be thrown while comapring both tuples"); + } + } + + // ----------------------- MINIMUM FUNCTION TEST BELOW -------------------------- + + /** + * This test validates whether the order of tuples has any impact on the outcome and if the smaller tuple is returned. + */ + @Test + public void testMinByComparison() { + SelectByMinFunction> minByTuple = new SelectByMinFunction>(tupleTypeInfo, new int[] {0}); + + try { + Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, bigger)); + Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(bigger, smaller)); + } catch (Exception e) { + Assert.fail("No exception should be thrown while comapring both tuples"); + } + } + + /** + * This test cases checks when two tuples only differ in one value, but this value is not + * in the fields list. In that case it should be seen as equal and then the first given tuple (value1) should be returned by reduce(). + */ + @Test + public void testMinByComparisonSpecialCase1() { + SelectByMinFunction> minByTuple = new SelectByMinFunction>(tupleTypeInfo, new int[] {0,3}); + + try { + Assert.assertSame("SelectByMin must return the first given tuple", specialCaseBigger, minByTuple.reduce(specialCaseBigger, bigger)); + Assert.assertSame("SelectByMin must return the first given tuple", bigger, minByTuple.reduce(bigger, specialCaseBigger)); + } catch (Exception e) { + Assert.fail("No exception should be thrown while comapring both tuples"); + } + } + + /** + * This test validates that when two tuples only differ in one value and that value's index is given + * at construction time. The smaller tuple must be returned then. + */ + @Test + public void testMinByComparisonSpecialCase2() { + SelectByMinFunction> minByTuple = new SelectByMinFunction>(tupleTypeInfo, new int[] {0,2,1,4,3}); + + try { + Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(specialCaseSmaller, smaller)); + Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, specialCaseSmaller)); + } catch (Exception e) { + Assert.fail("No exception should be thrown while comapring both tuples"); + } + } + + /** + * Checks whether reduce does behave as expected if both values are the same object. + */ + @Test + public void testMinByComparisonMultiple() { + SelectByMinFunction> minByTuple = new SelectByMinFunction>(tupleTypeInfo, new int[] {0,1,2,3,4}); + + try { + Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, bigger)); + Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(bigger, smaller)); + } catch (Exception e) { + Assert.fail("No exception should be thrown while comapring both tuples"); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java new file mode 100644 index 0000000..85bc2e5 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.operator; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.UnsortedGrouping; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.BasicTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.junit.Test; + +public class MaxByOperatorTest { + + // TUPLE DATA + private final List> emptyTupleData = new ArrayList>(); + + private final TupleTypeInfo> tupleTypeInfo = new TupleTypeInfo>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + /** + * This test validates that no exceptions is thrown when an empty dataset + * calls maxBy(). + */ + @Test + public void testMaxByKeyFieldsDataset() { + + final ExecutionEnvironment env = ExecutionEnvironment + .getExecutionEnvironment(); + DataSet> tupleDs = env + .fromCollection(emptyTupleData, tupleTypeInfo); + + // should work + try { + tupleDs.maxBy(4, 0, 1, 2, 3); + } catch (Exception e) { + Assert.fail(); + } + } + + private final List customTypeData = new ArrayList(); + + /** + * This test validates that an InvalidProgrammException is thrown when maxBy + * is used on a custom data type. + */ + @Test(expected = InvalidProgramException.class) + public void testCustomKeyFieldsDataset() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + this.customTypeData.add(new CustomType()); + + DataSet customDs = env.fromCollection(customTypeData); + // should not work: groups on custom type + customDs.maxBy(0); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsDataset1() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should not work, key out of tuple bounds + tupleDs.maxBy(5); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsDataset2() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should not work, key out of tuple bounds + tupleDs.maxBy(-1); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsDataset3() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should not work, key out of tuple bounds + tupleDs.maxBy(1,2,3,4,-1); + } + + //---------------------------- GROUPING TESTS BELOW -------------------------------------- + + /** + * This test validates that no exceptions is thrown when an empty grouping + * calls maxBy(). + */ + @Test + public void testMaxByKeyFieldsGrouping() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + UnsortedGrouping> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0); + + // should work + try { + groupDs.maxBy(4, 0, 1, 2, 3); + } catch (Exception e) { + Assert.fail(); + } + } + + /** + * This test validates that an InvalidProgrammException is thrown when maxBy + * is used on a custom data type. + */ + @Test(expected = InvalidProgramException.class) + public void testCustomKeyFieldsGrouping() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + this.customTypeData.add(new CustomType()); + + UnsortedGrouping groupDs = env.fromCollection(customTypeData).groupBy(0); + // should not work: groups on custom type + groupDs.maxBy(0); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsGrouping1() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + UnsortedGrouping> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0); + + // should not work, key out of tuple bounds + groupDs.maxBy(5); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsGrouping2() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + UnsortedGrouping> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0); + + // should not work, key out of tuple bounds + groupDs.maxBy(-1); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsGrouping3() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + UnsortedGrouping> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0); + + // should not work, key out of tuple bounds + groupDs.maxBy(1,2,3,4,-1); + } + + /** + * Custom data type, for testing purposes. + */ + public static class CustomType implements Serializable { + + private static final long serialVersionUID = 1L; + + public int myInt; + public long myLong; + public String myString; + + public CustomType() { + }; + + public CustomType(int i, long l, String s) { + myInt = i; + myLong = l; + myString = s; + } + + @Override + public String toString() { + return myInt + "," + myLong + "," + myString; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java new file mode 100644 index 0000000..ff6a851 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.operator; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.UnsortedGrouping; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.BasicTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.junit.Test; + +public class MinByOperatorTest { + + // TUPLE DATA + private final List> emptyTupleData = new ArrayList>(); + + private final TupleTypeInfo> tupleTypeInfo = new TupleTypeInfo>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + /** + * This test validates that no exceptions is thrown when an empty dataset + * calls minBy(). + */ + @Test + public void testMinByKeyFieldsDataset() { + + final ExecutionEnvironment env = ExecutionEnvironment + .getExecutionEnvironment(); + DataSet> tupleDs = env + .fromCollection(emptyTupleData, tupleTypeInfo); + + // should work + try { + tupleDs.minBy(4, 0, 1, 2, 3); + } catch (Exception e) { + Assert.fail(); + } + } + + private final List customTypeData = new ArrayList(); + + /** + * This test validates that an InvalidProgrammException is thrown when minBy + * is used on a custom data type. + */ + @Test(expected = InvalidProgramException.class) + public void testCustomKeyFieldsDataset() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + this.customTypeData.add(new CustomType()); + + DataSet customDs = env.fromCollection(customTypeData); + // should not work: groups on custom type + customDs.minBy(0); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsDataset1() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should not work, key out of tuple bounds + tupleDs.minBy(5); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsDataset2() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should not work, key out of tuple bounds + tupleDs.minBy(-1); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsDataset3() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should not work, key out of tuple bounds + tupleDs.minBy(1,2,3,4,-1); + } + + //---------------------------- GROUPING TESTS BELOW -------------------------------------- + + /** + * This test validates that no exceptions is thrown when an empty grouping + * calls minBy(). + */ + @Test + public void testMinByKeyFieldsGrouping() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + UnsortedGrouping> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0); + + // should work + try { + groupDs.minBy(4, 0, 1, 2, 3); + } catch (Exception e) { + Assert.fail(); + } + } + + /** + * This test validates that an InvalidProgrammException is thrown when minBy + * is used on a custom data type. + */ + @Test(expected = InvalidProgramException.class) + public void testCustomKeyFieldsGrouping() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + this.customTypeData.add(new CustomType()); + + UnsortedGrouping groupDs = env.fromCollection(customTypeData).groupBy(0); + // should not work: groups on custom type + groupDs.minBy(0); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsGrouping1() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + UnsortedGrouping> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0); + + // should not work, key out of tuple bounds + groupDs.minBy(5); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsGrouping2() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + UnsortedGrouping> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0); + + // should not work, key out of tuple bounds + groupDs.minBy(-1); + } + + /** + * This test validates that an index which is out of bounds throws an + * IndexOutOfBOundsExcpetion. + */ + @Test(expected = IndexOutOfBoundsException.class) + public void testOutOfTupleBoundsGrouping3() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + UnsortedGrouping> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0); + + // should not work, key out of tuple bounds + groupDs.minBy(1,2,3,4,-1); + } + + /** + * Custom data type, for testing purposes. + */ + public static class CustomType implements Serializable { + + private static final long serialVersionUID = 1L; + + public int myInt; + public long myLong; + public String myString; + + public CustomType() { + }; + + public CustomType(int i, long l, String s) { + myInt = i; + myLong = l; + myString = s; + } + + @Override + public String toString() { + return myInt + "," + myLong + "," + myString; + } + } + +}