Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BED151023C for ; Wed, 10 Dec 2014 14:31:05 +0000 (UTC) Received: (qmail 42952 invoked by uid 500); 10 Dec 2014 14:31:05 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 42925 invoked by uid 500); 10 Dec 2014 14:31:05 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 42916 invoked by uid 99); 10 Dec 2014 14:31:05 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Dec 2014 14:31:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 10 Dec 2014 14:31:02 +0000 Received: (qmail 42639 invoked by uid 99); 10 Dec 2014 14:30:42 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Dec 2014 14:30:42 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 64698A238E5; Wed, 10 Dec 2014 14:30:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Wed, 10 Dec 2014 14:30:43 -0000 Message-Id: <91e6dd0ab3d74256ab29761193419bd9@git.apache.org> In-Reply-To: <27503158a8e94bdfb276e88be31d2591@git.apache.org> References: <27503158a8e94bdfb276e88be31d2591@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] incubator-flink git commit: [FLINK-1312] [streaming] OutputSelector changed to SAM-type to allow java 8 lambdas for splitting X-Virus-Checked: Checked by ClamAV on apache.org [FLINK-1312] [streaming] OutputSelector changed to SAM-type to allow java 8 lambdas for splitting Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4a7ba2db Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4a7ba2db Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4a7ba2db Branch: refs/heads/master Commit: 4a7ba2dbb7744b41135994d7f947198b9c4e5065 Parents: 51c1f67 Author: Gyula Fora Authored: Tue Dec 9 18:11:32 2014 +0100 Committer: mbalassi Committed: Wed Dec 10 13:27:38 2014 +0100 ---------------------------------------------------------------------- docs/streaming_guide.md | 32 ++++-- .../api/collector/DirectedStreamCollector.java | 27 +++-- .../streaming/api/collector/OutputSelector.java | 30 ++---- .../api/collector/DirectedOutputTest.java | 26 ++--- .../api/collector/OutputSelectorTest.java | 108 ++++++++++--------- .../examples/iteration/IterateExample.java | 15 +-- 6 files changed, 124 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/docs/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index 294da24..ec3675a 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -459,36 +459,52 @@ The Reduce operator for the `ConnectedDataStream` applies a simple reduce transf ### Output splitting -Most data stream operators support directed outputs, meaning that different data elements are received by only given outputs. The outputs are referenced by their name given at the point of receiving: +Most data stream operators support directed outputs (output splitting), meaning that different output elements are sent only to specific outputs. The outputs are referenced by their name given at the point of receiving: ~~~java SplitDataStream split = someDataStream.split(outputSelector); -DataStream even = split.select("even"); +DataStream even = split.select("even”); DataStream odd = split.select("odd"); ~~~ -Data streams only receive the elements directed to selected output names. These outputs are directed by implementing a selector function (extending `OutputSelector`): +In the above example the data stream named ‘even’ will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements. + +Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”…)`. It is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality without having to select all names. + +The outputs of an operator are directed by implementing a selector function (implementing the `OutputSelector` interface): ~~~java -void select(OUT value, Collection outputs); +Iterable select(OUT value); ~~~ -The data is sent to all the outputs added to the collection outputs (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent. For example: +The data is sent to all the outputs returned in the iterable (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent. + +For example to split even and odd numbers: ~~~java @Override -void select(Integer value, Collection outputs) { +Iterable select(Integer value) { + + List outputs = new ArrayList(); + if (value % 2 == 0) { outputs.add("even"); } else { outputs.add("odd"); } + + return outputs; } ~~~ -This output selection allows data streams to listen to multiple outputs, and data points to be sent to multiple outputs. A value is sent to all the outputs specified in the `OutputSelector` and a data stream will receive a value if it has selected any of the outputs the value is sent to. The stream will receive the data at most once. -It is common that a stream listens to all the outputs, so `split.selectAll()` is provided as an alias for explicitly selecting all output names. +Or more compactly we can use lambda expressions in Java 8: + +~~~java +SplitDataStream split = someDataStream + .split(x -> Arrays.asList(String.valueOf(x % 2))); +~~~ +Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once. ### Iterations The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the core Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java index 5b9e88c..d029a6e 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java @@ -17,9 +17,8 @@ package org.apache.flink.streaming.api.collector; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -61,7 +60,7 @@ public class DirectedStreamCollector extends StreamCollector { super(channelID, serializationDelegate); this.outputSelector = outputSelector; this.emitted = new HashSet>>>(); - this.selectAllOutputs = new ArrayList>>>(); + this.selectAllOutputs = new LinkedList>>>(); } @Override @@ -81,19 +80,25 @@ public class DirectedStreamCollector extends StreamCollector { * */ protected void emitToOutputs() { - Collection outputNames = outputSelector.getOutputs(streamRecord.getObject()); + Iterable outputNames = outputSelector.select(streamRecord.getObject()); emitted.clear(); + + for (RecordWriter>> output : selectAllOutputs) { + try { + output.emit(serializationDelegate); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Emit to {} failed due to: {}", output, + StringUtils.stringifyException(e)); + } + } + } + emitted.addAll(selectAllOutputs); + for (String outputName : outputNames) { List>>> outputList = outputMap .get(outputName); try { - for (RecordWriter>> output : selectAllOutputs) { - if (!emitted.contains(output)) { - output.emit(serializationDelegate); - emitted.add(output); - } - } - if (outputList == null) { if (LOG.isErrorEnabled()) { String format = String.format( http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java index 9eb33d9..6dbcff4 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java @@ -18,45 +18,27 @@ package org.apache.flink.streaming.api.collector; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitDataStream; /** - * Class for defining an OutputSelector for a {@link SplitDataStream} using the - * {@link SingleOutputStreamOperator#split} call. Every output object of a + * Interface for defining an OutputSelector for a {@link SplitDataStream} using + * the {@link SingleOutputStreamOperator#split} call. Every output object of a * {@link SplitDataStream} will run through this operator to select outputs. * * @param * Type parameter of the split values. */ -public abstract class OutputSelector implements Serializable { - private static final long serialVersionUID = 1L; - - private Collection outputs; - - public OutputSelector() { - outputs = new ArrayList(); - } - - Collection getOutputs(OUT outputObject) { - outputs.clear(); - select(outputObject, outputs); - return outputs; - } - +public interface OutputSelector extends Serializable { /** * Method for selecting output names for the emitted objects when using the * {@link SingleOutputStreamOperator#split} method. The values will be - * emitted only to output names which are added to the outputs collection. - * The outputs collection is cleared automatically after each select call. + * emitted only to output names which are contained in the returned + * iterable. * * @param value * Output object for which the output selection should be made. - * @param outputs - * Selected output names should be added to this collection. */ - public abstract void select(OUT value, Collection outputs); + public Iterable select(OUT value); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java index b82ecc2..4ab1be2 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java @@ -22,13 +22,11 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.collector.OutputSelector; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -54,27 +52,31 @@ public class DirectedOutputTest { } } - static final class MyOutputSelector extends OutputSelector { + static final class MyOutputSelector implements OutputSelector { private static final long serialVersionUID = 1L; + List outputs = new ArrayList(); + @Override - public void select(Long value, Collection outputs) { + public Iterable select(Long value) { + outputs.clear(); if (value % 2 == 0) { outputs.add(EVEN); } else { outputs.add(ODD); } - + if (value == 10L) { outputs.add(TEN); } - + if (value == 11L) { outputs.add(NON_SELECTED); } + return outputs; } } - + static final class ListSink implements SinkFunction { private static final long serialVersionUID = 1L; @@ -99,23 +101,23 @@ public class DirectedOutputTest { } private static Map> outputs = new HashMap>(); - + @Test public void outputSelectorTest() throws Exception { - LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); - + SplitDataStream source = env.generateSequence(1, 11).split(new MyOutputSelector()); source.select(EVEN).addSink(new ListSink(EVEN)); source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN)); source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD)); source.selectAll().addSink(new ListSink(ALL)); - + env.executeTest(128); assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN)); assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN)); - assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(EVEN_AND_ODD)); + assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), + outputs.get(EVEN_AND_ODD)); assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL)); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java index e465e2f..1615a45 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java @@ -1,54 +1,58 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.flink.api.java.tuple.Tuple1; -import org.junit.Test; - -public class OutputSelectorTest { - - static final class MyOutputSelector extends OutputSelector> { - - private static final long serialVersionUID = 1L; - - @Override - public void select(Tuple1 tuple, Collection outputs) { - for (Integer i = 0; i < tuple.f0; i++) { - outputs.add(i.toString()); - } - } - } - - @Test - public void testGetOutputs() { - OutputSelector> selector = new MyOutputSelector(); - List expectedOutputs = new ArrayList(); - expectedOutputs.add("0"); - expectedOutputs.add("1"); - assertEquals(expectedOutputs, selector.getOutputs(new Tuple1(2))); - expectedOutputs.add("2"); - assertEquals(expectedOutputs, selector.getOutputs(new Tuple1(3))); - } - -} + */ + +package org.apache.flink.streaming.api.collector; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.junit.Test; + +public class OutputSelectorTest { + + static final class MyOutputSelector implements OutputSelector> { + + private static final long serialVersionUID = 1L; + + @Override + public Iterable select(Tuple1 tuple) { + + String[] outputs = new String[tuple.f0]; + + for (Integer i = 0; i < tuple.f0; i++) { + outputs[i] = i.toString(); + } + return Arrays.asList(outputs); + } + } + + @Test + public void testGetOutputs() { + OutputSelector> selector = new MyOutputSelector(); + List expectedOutputs = new ArrayList(); + expectedOutputs.add("0"); + expectedOutputs.add("1"); + assertEquals(expectedOutputs, selector.select(new Tuple1(2))); + expectedOutputs.add("2"); + assertEquals(expectedOutputs, selector.select(new Tuple1(3))); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index 8454f52..54dbdb0 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.examples.iteration; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Random; @@ -79,8 +78,8 @@ public class IterateExample { // apply the step function to add new random value to the tuple and to // increment the counter and split the output with the output selector - SplitDataStream> step = it.map(new Step()).shuffle().setBufferTimeout(1) - .split(new MySelector()); + SplitDataStream> step = it.map(new Step()).shuffle() + .setBufferTimeout(1).split(new MySelector()); // close the iteration by selecting the tuples that were directed to the // 'iterate' channel in the output selector @@ -129,16 +128,18 @@ public class IterateExample { /** * OutputSelector testing which tuple needs to be iterated again. */ - public static class MySelector extends OutputSelector> { + public static class MySelector implements OutputSelector> { private static final long serialVersionUID = 1L; @Override - public void select(Tuple2 value, Collection outputs) { + public Iterable select(Tuple2 value) { + List output = new ArrayList(); if (value.f0 > 100) { - outputs.add("output"); + output.add("output"); } else { - outputs.add("iterate"); + output.add("iterate"); } + return output; } }