flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/5] incubator-flink git commit: [FLINK-1312] [streaming] OutputSelector changed to SAM-type to allow java 8 lambdas for splitting
Date Wed, 10 Dec 2014 14:30:43 GMT
[FLINK-1312] [streaming] OutputSelector changed to SAM-type to allow java 8 lambdas for splitting


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4a7ba2db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4a7ba2db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4a7ba2db

Branch: refs/heads/master
Commit: 4a7ba2dbb7744b41135994d7f947198b9c4e5065
Parents: 51c1f67
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Dec 9 18:11:32 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Wed Dec 10 13:27:38 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  32 ++++--
 .../api/collector/DirectedStreamCollector.java  |  27 +++--
 .../streaming/api/collector/OutputSelector.java |  30 ++----
 .../api/collector/DirectedOutputTest.java       |  26 ++---
 .../api/collector/OutputSelectorTest.java       | 108 ++++++++++---------
 .../examples/iteration/IterateExample.java      |  15 +--
 6 files changed, 124 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 294da24..ec3675a 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -459,36 +459,52 @@ The Reduce operator for the `ConnectedDataStream` applies a simple reduce
transf
 
 ### Output splitting
 
-Most data stream operators support directed outputs, meaning that different data elements
are received by only given outputs. The outputs are referenced by their name given at the
point of receiving:
+Most data stream operators support directed outputs (output splitting), meaning that different
output elements are sent only to specific outputs. The outputs are referenced by their name
given at the point of receiving:
 
 ~~~java
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
-DataStream<Integer> even = split.select("even");
+DataStream<Integer> even = split.select("even”);
 DataStream<Integer> odd = split.select("odd");
 ~~~
 
-Data streams only receive the elements directed to selected output names. These outputs are
directed by implementing a selector function (extending `OutputSelector`):
+In the above example the data stream named ‘even’ will only contain elements that are
directed to the output named “even”. The user can of course further transform these new
stream by for example squaring only the even elements.
+
+Data streams only receive the elements directed to selected output names. The user can also
select multiple output names by `splitStream.select(“output1”, “output2”…)`. It
is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality
without having to select all names.
+
+The outputs of an operator are directed by implementing a selector function (implementing
the `OutputSelector` interface):
 
 ~~~java
-void select(OUT value, Collection<String> outputs);
+Iterable<String> select(OUT value);
 ~~~
 
-The data is sent to all the outputs added to the collection outputs (referenced by their
name). This way the direction of the outputs can be determined by the value of the data sent.
For example:
+The data is sent to all the outputs returned in the iterable (referenced by their name).
This way the direction of the outputs can be determined by the value of the data sent. 
+
+For example to split even and odd numbers:
 
 ~~~java
 @Override
-void select(Integer value, Collection<String> outputs) {
+Iterable<String> select(Integer value) {	
+
+    List<String> outputs = new ArrayList<String>();
+
     if (value % 2 == 0) {
         outputs.add("even");
     } else {
         outputs.add("odd");
     }
+
+    return outputs;
 }
 ~~~
 
-This output selection allows data streams to listen to multiple outputs, and data points
to be sent to multiple outputs. A value is sent to all the outputs specified in the `OutputSelector`
and a data stream will receive a value if it has selected any of the outputs the value is
sent to. The stream will receive the data at most once.
-It is common that a stream listens to all the outputs, so `split.selectAll()` is provided
as an alias for explicitly selecting all output names.
+Or more compactly we can use lambda expressions in Java 8:
+
+~~~java
+SplitDataStream<Integer> split = someDataStream
+					.split(x -> Arrays.asList(String.valueOf(x % 2)));
+~~~
 
+Every output will be emitted to the selected outputs exactly once, even if you add the same
output names more than once.
 
 ### Iterations
 The Flink Streaming API supports implementing iterative stream processing dataflows similarly
to the core Flink API. Iterative streaming programs also implement a step function and embed
it into an `IterativeDataStream`.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 5b9e88c..d029a6e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -17,9 +17,8 @@
 
 package org.apache.flink.streaming.api.collector;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
@@ -61,7 +60,7 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT>
{
 		super(channelID, serializationDelegate);
 		this.outputSelector = outputSelector;
 		this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-		this.selectAllOutputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		this.selectAllOutputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
 	}
 
 	@Override
@@ -81,19 +80,25 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT>
{
 	 *
 	 */
 	protected void emitToOutputs() {
-		Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
+		Iterable<String> outputNames = outputSelector.select(streamRecord.getObject());
 		emitted.clear();
+
+		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output :
selectAllOutputs) {
+			try {
+				output.emit(serializationDelegate);
+			} catch (Exception e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Emit to {} failed due to: {}", output,
+							StringUtils.stringifyException(e));
+				}
+			}
+		}
+		emitted.addAll(selectAllOutputs);
+
 		for (String outputName : outputNames) {
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList
= outputMap
 					.get(outputName);
 			try {
-				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output
: selectAllOutputs) {
-					if (!emitted.contains(output)) {
-						output.emit(serializationDelegate);
-						emitted.add(output);
-					}
-				}
-
 				if (outputList == null) {
 					if (LOG.isErrorEnabled()) {
 						String format = String.format(

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
index 9eb33d9..6dbcff4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
@@ -18,45 +18,27 @@
 package org.apache.flink.streaming.api.collector;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
 
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 
 /**
- * Class for defining an OutputSelector for a {@link SplitDataStream} using the
- * {@link SingleOutputStreamOperator#split} call. Every output object of a
+ * Interface for defining an OutputSelector for a {@link SplitDataStream} using
+ * the {@link SingleOutputStreamOperator#split} call. Every output object of a
  * {@link SplitDataStream} will run through this operator to select outputs.
  * 
  * @param <OUT>
  *            Type parameter of the split values.
  */
-public abstract class OutputSelector<OUT> implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private Collection<String> outputs;
-
-	public OutputSelector() {
-		outputs = new ArrayList<String>();
-	}
-
-	Collection<String> getOutputs(OUT outputObject) {
-		outputs.clear();
-		select(outputObject, outputs);
-		return outputs;
-	}
-
+public interface OutputSelector<OUT> extends Serializable {
 	/**
 	 * Method for selecting output names for the emitted objects when using the
 	 * {@link SingleOutputStreamOperator#split} method. The values will be
-	 * emitted only to output names which are added to the outputs collection.
-	 * The outputs collection is cleared automatically after each select call.
+	 * emitted only to output names which are contained in the returned
+	 * iterable.
 	 * 
 	 * @param value
 	 *            Output object for which the output selection should be made.
-	 * @param outputs
-	 *            Selected output names should be added to this collection.
 	 */
-	public abstract void select(OUT value, Collection<String> outputs);
+	public Iterable<String> select(OUT value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index b82ecc2..4ab1be2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -22,13 +22,11 @@ import static org.junit.Assert.assertEquals;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -54,27 +52,31 @@ public class DirectedOutputTest {
 		}
 	}
 
-	static final class MyOutputSelector extends OutputSelector<Long> {
+	static final class MyOutputSelector implements OutputSelector<Long> {
 		private static final long serialVersionUID = 1L;
 
+		List<String> outputs = new ArrayList<String>();
+
 		@Override
-		public void select(Long value, Collection<String> outputs) {
+		public Iterable<String> select(Long value) {
+			outputs.clear();
 			if (value % 2 == 0) {
 				outputs.add(EVEN);
 			} else {
 				outputs.add(ODD);
 			}
-			
+
 			if (value == 10L) {
 				outputs.add(TEN);
 			}
-			
+
 			if (value == 11L) {
 				outputs.add(NON_SELECTED);
 			}
+			return outputs;
 		}
 	}
-	
+
 	static final class ListSink implements SinkFunction<Long> {
 		private static final long serialVersionUID = 1L;
 
@@ -99,23 +101,23 @@ public class DirectedOutputTest {
 	}
 
 	private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
-	
+
 	@Test
 	public void outputSelectorTest() throws Exception {
-		
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		
+
 		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
 		source.select(EVEN).addSink(new ListSink(EVEN));
 		source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
 		source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
 		source.selectAll().addSink(new ListSink(ALL));
-		
+
 		env.executeTest(128);
 		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
 		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(EVEN_AND_ODD));
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
+				outputs.get(EVEN_AND_ODD));
 		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
index e465e2f..1615a45 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
@@ -1,54 +1,58 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.junit.Test;
-
-public class OutputSelectorTest {
-
-	static final class MyOutputSelector extends OutputSelector<Tuple1<Integer>>
{
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void select(Tuple1<Integer> tuple, Collection<String> outputs) {
-			for (Integer i = 0; i < tuple.f0; i++) {
-				outputs.add(i.toString());
-			}
-		}
-	}
-
-	@Test
-	public void testGetOutputs() {
-		OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
-		List<String> expectedOutputs = new ArrayList<String>();
-		expectedOutputs.add("0");
-		expectedOutputs.add("1");
-		assertEquals(expectedOutputs, selector.getOutputs(new Tuple1<Integer>(2)));
-		expectedOutputs.add("2");
-		assertEquals(expectedOutputs, selector.getOutputs(new Tuple1<Integer>(3)));
-	}
-
-}
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.junit.Test;
+
+public class OutputSelectorTest {
+
+	static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>>
{
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Iterable<String> select(Tuple1<Integer> tuple) {
+
+			String[] outputs = new String[tuple.f0];
+
+			for (Integer i = 0; i < tuple.f0; i++) {
+				outputs[i] = i.toString();
+			}
+			return Arrays.asList(outputs);
+		}
+	}
+
+	@Test
+	public void testGetOutputs() {
+		OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
+		List<String> expectedOutputs = new ArrayList<String>();
+		expectedOutputs.add("0");
+		expectedOutputs.add("1");
+		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(2)));
+		expectedOutputs.add("2");
+		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(3)));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 8454f52..54dbdb0 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.examples.iteration;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 
@@ -79,8 +78,8 @@ public class IterateExample {
 
 		// apply the step function to add new random value to the tuple and to
 		// increment the counter and split the output with the output selector
-		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle().setBufferTimeout(1)
-				.split(new MySelector());
+		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
+				.setBufferTimeout(1).split(new MySelector());
 
 		// close the iteration by selecting the tuples that were directed to the
 		// 'iterate' channel in the output selector
@@ -129,16 +128,18 @@ public class IterateExample {
 	/**
 	 * OutputSelector testing which tuple needs to be iterated again.
 	 */
-	public static class MySelector extends OutputSelector<Tuple2<Double, Integer>>
{
+	public static class MySelector implements OutputSelector<Tuple2<Double, Integer>>
{
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void select(Tuple2<Double, Integer> value, Collection<String> outputs)
{
+		public Iterable<String> select(Tuple2<Double, Integer> value) {
+			List<String> output = new ArrayList<String>();
 			if (value.f0 > 100) {
-				outputs.add("output");
+				output.add("output");
 			} else {
-				outputs.add("iterate");
+				output.add("iterate");
 			}
+			return output;
 		}
 
 	}


Mime
View raw message