flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [1/4] flink git commit: [FLINK-1450] Added fold operator for the Streaming API
Date Sun, 15 Mar 2015 09:57:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1c269e2f2 -> aaa231bc0


[FLINK-1450] Added fold operator for the Streaming API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c99cb69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c99cb69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c99cb69

Branch: refs/heads/master
Commit: 6c99cb69665564d9731a5087559c9874efc9af37
Parents: 1c269e2
Author: Akshay Dixit <akshaydixi@gmail.com>
Authored: Thu Mar 12 23:19:31 2015 +0530
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sat Mar 14 19:29:44 2015 +0100

----------------------------------------------------------------------
 .../api/common/functions/FoldFunction.java      | 53 ++++++++++++++++
 .../api/common/functions/RichFoldFunction.java  | 39 ++++++++++++
 .../flink/api/java/typeutils/TypeExtractor.java | 11 ++++
 .../streaming/api/datastream/DataStream.java    | 21 +++++++
 .../invokable/operator/StreamFoldInvokable.java | 65 ++++++++++++++++++++
 .../api/invokable/operator/StreamFoldTest.java  | 53 ++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  | 18 +++++-
 7 files changed, 257 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
new file mode 100644
index 0000000..0228dc1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for Fold functions. Fold functions combine groups of elements to
+ * a single value, by applying a binary operation to an initial accumulator element every
element from a group elements.
+ * <p>
+ * The basic syntax for using a FoldFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * X initialValue = ...;
+ * DataSet<X> result = input.fold(new MyFoldFunction(), initialValue);
+ * </blockquote></pre>
+ * <p>
+ * Like all functions, the FoldFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ *
+ * @param <T> Type of the initial input and the returned element
+ * @param <O> Type of the elements that the group/list/stream contains
+ */
+public interface FoldFunction<T, O> extends Function, Serializable {
+	/**
+	 * The core method of FoldFunction, combining two values into one value of the same type.
+	 * The fold function is consecutively applied to all values of a group until only a single
value remains.
+	 *
+	 * @param accumulator The initial value, and accumulator.
+	 * @param value The value from the group to "fold" into the accumulator.
+	 * @return The accumulator that is at the end of the "folding" the group.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause
the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	T fold(T accumulator, O value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
new file mode 100644
index 0000000..5abee70
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link FoldFunction}. As a {@link RichFunction}, it gives access to
the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown
methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <T> Type of the initial input and the returned element
+ * @param <O> Type of the elements that the group/list/stream contains
+ */
+public abstract class RichFoldFunction<T, O> extends AbstractRichFunction implements
FoldFunction<T, O> {
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract T fold(T accumulator, O value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 5f52f98..c5e2e8f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -100,6 +101,16 @@ public class TypeExtractor {
 	{
 		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false,
true, inType, functionName, allowMissing);
 	}
+
+	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<OUT,
IN> foldInterface, TypeInformation<IN> inType)
+	{
+		return getFoldReturnTypes(foldInterface, inType, null, false);
+	}
+
+	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<OUT,
IN> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false,
false, inType, functionName, allowMissing);
+	}
 	
 	
 	public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN,
OUT> mapPartitionInterface, TypeInformation<IN> inType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f7b5f07..29d7aba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -24,10 +24,12 @@ import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFoldFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.io.OutputFormat;
@@ -62,6 +64,7 @@ import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.StreamFoldInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
@@ -521,6 +524,24 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Applies a fold transformation on the data stream. The returned stream
+	 * contains all the intermediate values of the fold transformation. The
+	 * user can also extend the {@link RichFoldFunction} to gain access to
+	 * other features provided by the {@link org.apache.flink.api.common.functions.RichFunction}
+	 * interface
+	 *
+	 * @param folder
+	 *          The {@link FoldFunction} that will be called for every element
+	 *          of the input values.
+	 * @return The transformed DataStream
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> fold(FoldFunction<R, OUT>
folder, R initialValue) {
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
+
+		return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder),
initialValue));
+	}
+
+	/**
 	 * Applies a Filter transformation on a {@link DataStream}. The
 	 * transformation calls a {@link FilterFunction} for each element of the
 	 * DataStream and retains only those element for which the function returns

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
new file mode 100644
index 0000000..36b75f2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT>
{
+	private static final long serialVersionUID = 1L;
+
+	protected FoldFunction<OUT, IN> folder;
+	protected OUT accumulator;
+	protected IN nextValue;
+
+	public StreamFoldInvokable(FoldFunction<OUT, IN> folder, OUT initialValue) {
+		super(folder);
+		this.folder = folder;
+		this.accumulator = initialValue;
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		while (isRunning && readNext() != null) {
+			fold();
+		}
+	}
+
+	protected void fold() throws Exception {
+		callUserFunctionAndLogException();
+
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+
+		nextValue = nextObject;
+		accumulator = folder.fold(accumulator, copy(nextValue));
+		collector.collect(accumulator);
+
+	}
+
+	@Override
+	public void collect(IN record) {
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
new file mode 100644
index 0000000..b07191c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class StreamFoldTest {
+
+	private static class MyFolder implements FoldFunction<String, Integer>{
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String fold(String accumulator, Integer value) throws Exception {
+			return accumulator + value.toString();
+		}
+	}
+
+	@Test
+	public void test() {
+		StreamFoldInvokable<Integer, String> invokable1 = new StreamFoldInvokable<Integer,
String>(
+				new MyFolder(), "");
+
+		List<String> expected = Arrays.asList("1","11","112","1123","11233");
+		List<String> actual = MockContext.createAndExecute(invokable1,
+				Arrays.asList(1, 1, 2, 3, 3));
+
+		assertEquals(expected, actual);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 15467bb..0b2b2b9 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -25,14 +25,13 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+import org.apache.flink.streaming.api.invokable.operator._
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.streaming.api.invokable.StreamInvokable
-import org.apache.flink.streaming.api.invokable.operator.{ GroupedReduceInvokable, StreamReduceInvokable
}
 import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.functions.FoldFunction
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.streaming.api.function.sink.SinkFunction
@@ -425,6 +424,19 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Creates a new [[DataStream]] by folding the elements of this DataStream
+   * using an associative reduce function and an initial value.
+   */
+  def fold[R: TypeInformation: ClassTag](folder: FoldFunction[R,T], initialValue: R): DataStream[R]
= {
+    if (folder == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    javaStream.transform("fold", implicitly[TypeInformation[R]],
+        new StreamFoldInvokable[T,R](folder, initialValue))
+  }
+
+
+  /**
    * Creates a new [[DataStream]] by reducing the elements of this DataStream
    * using an associative reduce function.
    */


Mime
View raw message