flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/3] git commit: [FLINK-970] Adds first() operation on DataSet, UnsortedGrouping, and SortedGrouping
Date Wed, 24 Sep 2014 16:22:24 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master e5731e0ed -> a3b02840d


[FLINK-970] Adds first() operation on DataSet, UnsortedGrouping, and SortedGrouping

This closes #88


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6702a2e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6702a2e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6702a2e3

Branch: refs/heads/master
Commit: 6702a2e317bee74930999ba26d50e75f555d75c5
Parents: e5731e0
Author: zentol <s.motsu@web.de>
Authored: Mon Jul 28 14:13:19 2014 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Sep 24 12:44:13 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 10 ++++
 .../flink/api/java/functions/FirstReducer.java  | 54 ++++++++++++++++++++
 .../api/java/operators/SortedGrouping.java      | 10 ++++
 .../api/java/operators/UnsortedGrouping.java    | 10 ++++
 4 files changed, 84 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index ff487a2..61cb429 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -50,6 +50,7 @@ import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.operators.FilterOperator;
+import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.java.operators.FlatMapOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.operators.JoinOperator.JoinHint;
@@ -397,6 +398,15 @@ public abstract class DataSet<T> {
 				(TupleTypeInfo) this.type, fields));
 	}
 
+	/**
+	 * Returns a new set containing the first n elements in this {@link DataSet}.<br/>
+	 * @param n The desired number of elements.
+	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
+	*/
+	public GroupReduceOperator<T, T> first(int n) {
+		return reduceGroup(new FirstReducer<T>(n));
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  distinct
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
new file mode 100644
index 0000000..890a0ca
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+@Combinable
+public class FirstReducer<T> implements GroupReduceFunction<T, T>, FlatCombineFunction<T>
{
+	private static final long serialVersionUID = 1L;
+
+	private final int count;
+
+	public FirstReducer(int n) {
+		this.count = n;
+	}
+
+	@Override
+	public void reduce(Iterable<T> values, Collector<T> out) throws Exception {
+
+		int emitCnt = 0;
+		for(T val : values) {
+			out.collect(val);
+			
+			emitCnt++;
+			if(emitCnt == count) {
+				break;
+			}
+		}
+	}
+	
+	@Override
+	public void combine(Iterable<T> values, Collector<T> out) throws Exception {
+		reduce(values, out);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 24744e3..6da5a1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.java.functions.FirstReducer;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
@@ -92,6 +93,15 @@ public class SortedGrouping<T> extends Grouping<T> {
 	}
 
 	
+	/**
+	 * Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>
+	 * @param n The desired number of elements.
+	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
+	*/
+	public GroupReduceOperator<T, T> first(int n) {
+		return reduceGroup(new FirstReducer<T>(n));
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Group Operations
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6702a2e3/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index e0b9bf3..55dec7e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.aggregation.Aggregations;
@@ -139,6 +140,15 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 
 		return new GroupReduceOperator<T, R>(this, resultType, reducer);
 	}
+	
+	/**
+	 * Returns a new set containing the first n elements in this grouped {@link DataSet}.<br/>
+	 * @param n The desired number of elements.
+	 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
+	*/
+	public GroupReduceOperator<T, T> first(int n) {
+		return reduceGroup(new FirstReducer<T>(n));
+	}
 
 	/**
 	 * Applies a special case of a reduce transformation (minBy) on a grouped {@link DataSet}.<br/>


Mime
View raw message