flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [2/4] flink git commit: [FLINK-1450] Added GroupFoldFunction and GroupedFoldInvokable with a test. Integrated them into DataStream
Date Sun, 15 Mar 2015 09:57:14 GMT
[FLINK-1450] Added GroupFoldFunction and GroupedFoldInvokable with a test. Integrated them
into DataStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f785c73e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f785c73e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f785c73e

Branch: refs/heads/master
Commit: f785c73e81348c062578319588ce24194c0a2061
Parents: 6c99cb6
Author: Akshay Dixit <akshaydixi@gmail.com>
Authored: Sat Mar 14 02:22:38 2015 +0530
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sat Mar 14 19:29:53 2015 +0100

----------------------------------------------------------------------
 .../api/common/functions/GroupFoldFunction.java | 56 +++++++++++++++++
 .../api/datastream/GroupedDataStream.java       | 29 +++++++++
 .../operator/GroupedFoldInvokable.java          | 61 +++++++++++++++++++
 .../invokable/operator/StreamFoldInvokable.java |  2 +-
 .../operator/GroupedFoldInvokableTest.java      | 63 ++++++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  | 43 +++++++++----
 6 files changed, 242 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
new file mode 100644
index 0000000..fb59c89
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * The interface for group fold functions. GroupFoldFunctions process groups of elements.
+ * They may aggregate them to a single value, or produce multiple result values for each
group.
+ * The group may be defined by sharing a common grouping key, or the group may simply be
+ * all elements of a data set.
+ * <p>
+ * For a fold function that works incrementally by combining always two elements, see
+ * {@link FoldFunction}.
+ * <p>
+ * The basic syntax for using a grouped GroupFoldFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * X initialValue = ...;
+ * DataSet<X> result = input.groupBy(<key-definition>).foldGroup(new MyGroupFoldFunction(),
initialValue);
+ * </blockquote></pre>
+ *
+ * @param <T> Type of the initial input and the returned element
+ * @param <O> Type of the elements that the group/list/stream contains
+ */
+public interface GroupFoldFunction<T, O> extends Function, Serializable {
+	/**
+	 * The fold method. The function receives one call per group of elements.
+	 *
+	 * @param values All records that belong to the given input key.
+	 * @param out The collector to hand results to.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause
the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void fold(Iterable<T> values, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index c871c20..50ae542 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -17,10 +17,14 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.invokable.operator.GroupedFoldInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 
@@ -71,6 +75,31 @@ public class GroupedDataStream<OUT> extends DataStream<OUT>
{
 	}
 
 	/**
+	 * Applies a fold transformation on the grouped data stream grouped on by
+	 * the given key position. The {@link FoldFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same folder.The user can also extend
+	 * {@link RichFoldFunction} to gain access to other features provided by
+	 * the {@link RichFuntion} interface.
+	 *
+	 * @param folder
+	 *            The {@link FoldFunction} that will be called for every
+	 *            element of the input values with the same key.
+	 * @param initialValue
+	 *            The initialValue passed to the folders for each key.
+	 * @return The transformed DataStream.
+	 */
+
+	@Override
+	public <R> SingleOutputStreamOperator<R, ?> fold(FoldFunction<R, OUT>
folder, R initialValue) {
+
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
+
+		return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder),
keySelector,
+				initialValue));
+	}
+
+	/**
 	 * Applies an aggregation that sums the grouped data stream at the given
 	 * position, grouped by the given key position. Input values with the same
 	 * key will be summed.

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
new file mode 100644
index 0000000..3263955
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT>
{
+	private static final long serialVersionUID = 1L;
+
+	private KeySelector<IN, ?> keySelector;
+	private Map<Object, OUT> values;
+	private OUT folded;
+	private OUT initialValue;
+
+	public GroupedFoldInvokable(FoldFunction<OUT, IN> folder, KeySelector<IN, ?>
keySelector, OUT initialValue) {
+		super(folder, initialValue);
+		this.keySelector = keySelector;
+		this.initialValue = initialValue;
+		values = new HashMap<Object, OUT>();
+	}
+
+	@Override
+	protected void fold() throws Exception {
+		Object key = nextRecord.getKey(keySelector);
+		accumulator = values.get(key);
+		nextValue = nextObject;
+		if (accumulator != null) {
+			callUserFunctionAndLogException();
+			values.put(key, folded);
+			collector.collect(folded);
+		} else {
+			values.put(key, initialValue);
+			collector.collect(initialValue);
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		folded = folder.fold(accumulator, nextValue);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
index 36b75f2..10912db 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@@ -49,7 +49,7 @@ public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN,
OUT> {
 	protected void callUserFunction() throws Exception {
 
 		nextValue = nextObject;
-		accumulator = folder.fold(accumulator, copy(nextValue));
+		accumulator = folder.fold(accumulator, nextValue);
 		collector.collect(accumulator);
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
new file mode 100644
index 0000000..c8209f9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.util.MockContext;
+
+import org.junit.Test;
+
+public class GroupedFoldInvokableTest {
+
+	private static class MyFolder implements FoldFunction<String, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String fold(String accumulator, Integer value) throws Exception {
+			return accumulator + value.toString();
+		}
+
+	}
+
+	@Test
+	public void test() {
+		GroupedFoldInvokable<Integer, String> invokable1 = new GroupedFoldInvokable<Integer,
String>(
+				new MyFolder(), new KeySelector<Integer, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String getKey(Integer value) throws Exception {
+				return value.toString();
+			}
+		}, "100");
+
+		List<String> expected = Arrays.asList("100", "1001", "100", "1002", "100");
+		List<String> actual = MockContext.createAndExecute(invokable1,
+				Arrays.asList(1, 1, 2, 2, 3));
+
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0b2b2b9..1663c8a 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -424,31 +424,52 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      val cleanFun = clean(fun)
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
    * Creates a new [[DataStream]] by folding the elements of this DataStream
-   * using an associative reduce function and an initial value.
+   * using an associative fold function and an initial value.
    */
-  def fold[R: TypeInformation: ClassTag](folder: FoldFunction[R,T], initialValue: R): DataStream[R]
= {
+  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[R,T]): DataStream[R]
= {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    javaStream.transform("fold", implicitly[TypeInformation[R]],
+    javaStream match {
+      case ds: GroupedDataStream[_] => javaStream.transform("fold",
+        implicitly[TypeInformation[R]], new GroupedFoldInvokable[T,R](folder, ds.getKeySelector(),
initialValue))
+      case _ => javaStream.transform("fold", implicitly[TypeInformation[R]],
         new StreamFoldInvokable[T,R](folder, initialValue))
+    }
   }
 
-
   /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream
-   * using an associative reduce function.
+   * Creates a new [[DataStream]] by folding the elements of this DataStream
+   * using an associative fold function and an initial value.
    */
-  def reduce(fun: (T, T) => T): DataStream[T] = {
+  def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R]
= {
     if (fun == null) {
-      throw new NullPointerException("Reduce function must not be null.")
+      throw new NullPointerException("Fold function must not be null.")
     }
-    val reducer = new ReduceFunction[T] {
+    val folder = new FoldFunction[R,T] {
       val cleanFun = clean(fun)
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+
+      def fold(acc: R, v: T) = {
+        cleanFun(acc, v)
+      }
     }
-    reduce(reducer)
+    fold(initialValue, folder)
   }
 
   /**


Mime
View raw message