Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 01E37175AE for ; Fri, 20 Mar 2015 12:41:56 +0000 (UTC) Received: (qmail 73814 invoked by uid 500); 20 Mar 2015 12:41:33 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 73726 invoked by uid 500); 20 Mar 2015 12:41:33 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 73696 invoked by uid 99); 20 Mar 2015 12:41:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 12:41:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87830E109D; Fri, 20 Mar 2015 12:41:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: gyfora@apache.org To: commits@flink.apache.org Date: Fri, 20 Mar 2015 12:41:37 -0000 Message-Id: <5bd7eb46bd694f72b594b45991028908@git.apache.org> In-Reply-To: <6c96aaf71d7d4011a195932a131985a5@git.apache.org> References: <6c96aaf71d7d4011a195932a131985a5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/10] flink git commit: [FLINK-1594] [streaming] Added OutputSelector wrapping [FLINK-1594] [streaming] Added OutputSelector wrapping Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d832400b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d832400b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d832400b Branch: refs/heads/master Commit: d832400b3fee6c73553492e5954587958b4f7137 Parents: 3158d1d Author: Gábor Hermann Authored: Mon Mar 2 10:20:14 2015 +0100 Committer: Gyula Fora Committed: Fri Mar 20 11:25:03 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/StreamConfig.java | 32 ++--- .../apache/flink/streaming/api/StreamGraph.java | 8 +- .../api/StreamingJobGraphGenerator.java | 2 +- .../api/collector/CollectorWrapper.java | 17 ++- .../api/collector/DirectedCollectorWrapper.java | 131 ------------------- .../streaming/api/collector/OutputSelector.java | 44 ------- .../BroadcastOutputSelectorWrapper.java | 43 ++++++ .../selector/DirectedOutputSelectorWrapper.java | 95 ++++++++++++++ .../api/collector/selector/OutputSelector.java | 44 +++++++ .../selector/OutputSelectorWrapper.java | 31 +++++ .../selector/OutputSelectorWrapperFactory.java | 32 +++++ .../streaming/api/datastream/DataStream.java | 6 +- .../api/datastream/SplitDataStream.java | 2 +- .../api/streamvertex/OutputHandler.java | 39 +++--- .../flink/streaming/api/OutputSplitterTest.java | 2 +- .../api/collector/DirectedOutputTest.java | 1 + .../api/collector/OutputSelectorTest.java | 1 + .../examples/iteration/IterateExample.java | 2 +- .../flink/streaming/api/scala/DataStream.scala | 2 +- 19 files changed, 293 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 5b6de85..f58f0ad 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -27,7 +27,7 @@ import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.api.streamvertex.StreamVertexException; @@ -49,7 +49,8 @@ public class StreamConfig implements Serializable { private static final String VERTEX_NAME = "vertexID"; private static final String OPERATOR_NAME = "operatorName"; private static final String ITERATION_ID = "iteration-id"; - private static final String OUTPUT_SELECTOR = "outputSelector"; +// private static final String OUTPUT_SELECTOR = "outputSelector"; + private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper"; private static final String DIRECTED_EMIT = "directedEmit"; private static final String SERIALIZEDUDF = "serializedudf"; private static final String USER_FUNCTION = "userfunction"; @@ -67,7 +68,6 @@ public class StreamConfig implements Serializable { private static final String IN_STREAM_EDGES = "out stream edges"; // DEFAULT VALUES - private static final long DEFAULT_TIMEOUT = 100; public static final String STATE_MONITORING = "STATE_MONITORING"; @@ -189,33 +189,21 @@ public class StreamConfig implements Serializable { } } - public void setDirectedEmit(boolean directedEmit) { - config.setBoolean(DIRECTED_EMIT, directedEmit); - } - - public boolean isDirectedEmit() { - return config.getBoolean(DIRECTED_EMIT, false); - } - - public void setOutputSelectors(List> outputSelector) { + public void setOutputSelectorWrapper(OutputSelectorWrapper outputSelectorWrapper) { try { - if (outputSelector != null && !outputSelector.isEmpty()) { - setDirectedEmit(true); - config.setBytes(OUTPUT_SELECTOR, - SerializationUtils.serialize((Serializable) outputSelector)); - } + config.setBytes(OUTPUT_SELECTOR_WRAPPER, SerializationUtils.serialize(outputSelectorWrapper)); } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize OutputSelector"); + throw new RuntimeException("Cannot serialize OutputSelectorWrapper"); } } @SuppressWarnings("unchecked") - public List> getOutputSelectors(ClassLoader cl) { + public OutputSelectorWrapper getOutputSelectorWrapper(ClassLoader cl) { try { - return (List>) InstantiationUtil.readObjectFromConfig(this.config, - OUTPUT_SELECTOR, cl); + return (OutputSelectorWrapper) InstantiationUtil.readObjectFromConfig(this.config, + OUTPUT_SELECTOR_WRAPPER, cl); } catch (Exception e) { - throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e); + throw new StreamVertexException("Cannot deserialize and instantiate OutputSelectorWrapper", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index dfe66a5..948ea5e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -37,7 +37,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory; +import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; @@ -537,8 +539,8 @@ public class StreamGraph extends StreamingPlan { return inputFormatLists.get(vertexID); } - public List> getOutputSelector(Integer vertexID) { - return outputSelectors.get(vertexID); + public OutputSelectorWrapper getOutputSelectorWrapper(Integer vertexID) { + return OutputSelectorWrapperFactory.create(outputSelectors.get(vertexID)); } public Integer getIterationID(Integer vertexID) { http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index 3ff64ca..79d43c0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -213,7 +213,7 @@ public class StreamingJobGraphGenerator { config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexID)); config.setUserInvokable(streamGraph.getInvokable(vertexID)); - config.setOutputSelectors(streamGraph.getOutputSelector(vertexID)); + config.setOutputSelectorWrapper(streamGraph.getOutputSelectorWrapper(vertexID)); config.setNumberOfOutputs(nonChainableOutputs.size()); config.setNonChainedOutputs(nonChainableOutputs); http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java index 1281bf0..bb0268a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java @@ -17,27 +17,26 @@ package org.apache.flink.streaming.api.collector; -import java.util.LinkedList; -import java.util.List; - +import org.apache.flink.streaming.api.StreamEdge; +import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.util.Collector; public class CollectorWrapper implements Collector { - private List> outputs; + private OutputSelectorWrapper outputSelectorWrapper; - public CollectorWrapper() { - this.outputs = new LinkedList>(); + public CollectorWrapper(OutputSelectorWrapper outputSelectorWrapper) { + this.outputSelectorWrapper = outputSelectorWrapper; } @SuppressWarnings("unchecked") - public void addCollector(Collector output) { - outputs.add((Collector) output); + public void addCollector(Collector output, StreamEdge edge) { + outputSelectorWrapper.addCollector(output, edge); } @Override public void collect(OUT record) { - for(Collector output: outputs){ + for (Collector output : outputSelectorWrapper.getSelectedOutputs(record)) { output.collect(record); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java deleted file mode 100755 index 4681cd3..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A StreamCollector that uses user defined output names and a user defined - * output selector to make directed emits. - * - * @param - * Type of the Tuple collected. - */ -public class DirectedCollectorWrapper extends CollectorWrapper { - - private static final Logger LOG = LoggerFactory.getLogger(DirectedCollectorWrapper.class); - - List> outputSelectors; - - protected Map>> outputMap; - - private List> selectAllOutputs; - private Set> emitted; - - /** - * Creates a new DirectedStreamCollector - * - * @param outputSelector - * User defined {@link OutputSelector} - */ - public DirectedCollectorWrapper(List> outputSelectors) { - this.outputSelectors = outputSelectors; - this.emitted = new HashSet>(); - this.selectAllOutputs = new LinkedList>(); - this.outputMap = new HashMap>>(); - - } - - @Override - public void addCollector(Collector output) { - addCollector(output, new ArrayList()); - } - - @SuppressWarnings("unchecked") - public void addCollector(Collector output, List selectedNames) { - - if (selectedNames.isEmpty()) { - selectAllOutputs.add((Collector) output); - } else { - for (String selectedName : selectedNames) { - - if (!outputMap.containsKey(selectedName)) { - outputMap.put(selectedName, new LinkedList>()); - outputMap.get(selectedName).add((Collector) output); - } else { - if (!outputMap.get(selectedName).contains(output)) { - outputMap.get(selectedName).add((Collector) output); - } - } - - } - } - } - - @Override - public void collect(OUT record) { - emitted.clear(); - - for (Collector output : selectAllOutputs) { - output.collect(record); - emitted.add(output); - } - - for (OutputSelector outputSelector : outputSelectors) { - Iterable outputNames = outputSelector.select(record); - - for (String outputName : outputNames) { - List> outputList = outputMap.get(outputName); - if (outputList == null) { - if (LOG.isErrorEnabled()) { - String format = String.format( - "Cannot emit because no output is selected with the name: %s", - outputName); - LOG.error(format); - - } - } else { - for (Collector output : outputList) { - if (!emitted.contains(output)) { - output.collect(record); - emitted.add(output); - } - } - - } - - } - } - - } - - @Override - public void close() { - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java deleted file mode 100644 index 6dbcff4..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.io.Serializable; - -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.SplitDataStream; - -/** - * Interface for defining an OutputSelector for a {@link SplitDataStream} using - * the {@link SingleOutputStreamOperator#split} call. Every output object of a - * {@link SplitDataStream} will run through this operator to select outputs. - * - * @param - * Type parameter of the split values. - */ -public interface OutputSelector extends Serializable { - /** - * Method for selecting output names for the emitted objects when using the - * {@link SingleOutputStreamOperator#split} method. The values will be - * emitted only to output names which are contained in the returned - * iterable. - * - * @param value - * Output object for which the output selection should be made. - */ - public Iterable select(OUT value); -} http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java new file mode 100644 index 0000000..44371f0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector.selector; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.streaming.api.StreamEdge; +import org.apache.flink.util.Collector; + +public class BroadcastOutputSelectorWrapper implements OutputSelectorWrapper { + + private List> outputs; + + public BroadcastOutputSelectorWrapper() { + outputs = new ArrayList>(); + } + + @Override + public void addCollector(Collector output, StreamEdge edge) { + outputs.add((Collector) output); + } + + @Override + public Iterable> getSelectedOutputs(OUT record) { + return outputs; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java new file mode 100644 index 0000000..624fac1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector.selector; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.streaming.api.StreamEdge; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DirectedOutputSelectorWrapper implements OutputSelectorWrapper { + + private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class); + + private List> outputSelectors; + + private Map>> outputMap; + private Set> selectAllOutputs; +// private Set> emitted; + + public DirectedOutputSelectorWrapper(List> outputSelectors) { + this.outputSelectors = outputSelectors; +// this.emitted = new HashSet>(); + this.selectAllOutputs = new HashSet>(); //new LinkedList>(); + this.outputMap = new HashMap>>(); + } + + @Override + public void addCollector(Collector output, StreamEdge edge) { + List selectedNames = edge.getSelectedNames(); + + if (selectedNames.isEmpty()) { + selectAllOutputs.add((Collector) output); + } else { + for (String selectedName : selectedNames) { + + if (!outputMap.containsKey(selectedName)) { + outputMap.put(selectedName, new LinkedList>()); + outputMap.get(selectedName).add((Collector) output); + } else { + if (!outputMap.get(selectedName).contains(output)) { + outputMap.get(selectedName).add((Collector) output); + } + } + } + } + } + + @Override + public Iterable> getSelectedOutputs(OUT record) { + Set> selectedOutputs = new HashSet>(selectAllOutputs); + + for (OutputSelector outputSelector : outputSelectors) { + Iterable outputNames = outputSelector.select(record); + + for (String outputName : outputNames) { + List> outputList = outputMap.get(outputName); + + try { + selectedOutputs.addAll(outputList); + } catch (NullPointerException e) { + if (LOG.isErrorEnabled()) { + String format = String.format( + "Cannot emit because no output is selected with the name: %s", + outputName); + LOG.error(format); + } + } + } + } + + return selectedOutputs; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java new file mode 100644 index 0000000..b886fa6 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector.selector; + +import java.io.Serializable; + +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitDataStream; + +/** + * Interface for defining an OutputSelector for a {@link SplitDataStream} using + * the {@link SingleOutputStreamOperator#split} call. Every output object of a + * {@link SplitDataStream} will run through this operator to select outputs. + * + * @param + * Type parameter of the split values. + */ +public interface OutputSelector extends Serializable { + /** + * Method for selecting output names for the emitted objects when using the + * {@link SingleOutputStreamOperator#split} method. The values will be + * emitted only to output names which are contained in the returned + * iterable. + * + * @param value + * Output object for which the output selection should be made. + */ + public Iterable select(OUT value); +} http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java new file mode 100644 index 0000000..850a1d9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector.selector; + +import java.io.Serializable; + +import org.apache.flink.streaming.api.StreamEdge; +import org.apache.flink.util.Collector; + +public interface OutputSelectorWrapper extends Serializable { + + public void addCollector(Collector output, StreamEdge edge); + + public Iterable> getSelectedOutputs(OUT record); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java new file mode 100644 index 0000000..c0f22c7 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector.selector; + +import java.util.List; + +public class OutputSelectorWrapperFactory { + + public static OutputSelectorWrapper create(List> outputSelectors) { + if (outputSelectors.size() == 0) { + return new BroadcastOutputSelectorWrapper(); + } else { + return new DirectedOutputSelectorWrapper(outputSelectors); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 879a98c..5f6f981 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -47,7 +47,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.StreamGraph; -import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator; import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -238,12 +238,12 @@ public class DataStream { /** * Operator used for directing tuples to specific named outputs using an - * {@link org.apache.flink.streaming.api.collector.OutputSelector}. Calling + * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}. Calling * this method on an operator creates a new {@link SplitDataStream}. * * @param outputSelector * The user defined - * {@link org.apache.flink.streaming.api.collector.OutputSelector} + * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector} * for directing the tuples. * @return The {@link SplitDataStream} */ http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java index 97458a8..69e059e 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.datastream; import java.util.Arrays; -import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; /** * The SplitDataStream represents an operator that has been split using an http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index ca6b34d..18ddc79 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -30,8 +30,8 @@ import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; import org.apache.flink.streaming.api.StreamEdge; import org.apache.flink.streaming.api.collector.CollectorWrapper; -import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper; import org.apache.flink.streaming.api.collector.StreamOutput; +import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; @@ -100,22 +100,22 @@ public class OutputHandler { * This method builds up a nested collector which encapsulates all the * chained operators and their network output. The result of this recursive * call will be passed as collector to the first invokable in the chain. - * + * * @param chainedTaskConfig - * The configuration of the starting operator of the chain, we - * use this paramater to recursively build the whole chain + * The configuration of the starting operator of the chain, we + * use this paramater to recursively build the whole chain * @return Returns the collector for the chain starting from the given - * config + * config */ - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) private Collector createChainedCollector(StreamConfig chainedTaskConfig) { - boolean isDirectEmit = chainedTaskConfig.isDirectedEmit(); // We create a wrapper that will encapsulate the chained operators and // network outputs - CollectorWrapper wrapper = isDirectEmit ? new DirectedCollectorWrapper( - chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper(); + + OutputSelectorWrapper outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl); + CollectorWrapper wrapper = new CollectorWrapper(outputSelectorWrapper); // Create collectors for the network outputs for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) { @@ -123,12 +123,7 @@ public class OutputHandler { Collector outCollector = outputMap.get(output); - if (isDirectEmit) { - ((DirectedCollectorWrapper) wrapper).addCollector(outCollector, - chainedTaskConfig.getSelectedNames(output)); - } else { - wrapper.addCollector(outCollector); - } + wrapper.addCollector(outCollector, outputEdge); } // Create collectors for the chained outputs @@ -136,12 +131,8 @@ public class OutputHandler { Integer output = outputEdge.getTargetVertex(); Collector outCollector = createChainedCollector(chainedConfigs.get(output)); - if (isDirectEmit) { - ((DirectedCollectorWrapper) wrapper).addCollector(outCollector, - chainedTaskConfig.getSelectedNames(output)); - } else { - wrapper.addCollector(outCollector); - } + + wrapper.addCollector(outCollector, outputEdge); } if (chainedTaskConfig.isChainStart()) { @@ -169,11 +160,11 @@ public class OutputHandler { /** * We create the StreamOutput for the specific output given by the id, and * the configuration of its source task - * + * * @param outputVertex - * Name of the output to which the streamoutput will be set up + * Name of the output to which the streamoutput will be set up * @param configuration - * The config of upStream task + * The config of upStream task * @return */ private StreamOutput createStreamOutput(Integer outputVertex, http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java index cf6bb3c..14f0fa0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.TestListResultSink; http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java index ffc7c74..13bf457 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.sink.SinkFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java index 1615a45..a3d89f2 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.junit.Test; public class OutputSelectorTest { http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index 998e818..bbd5433 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.datastream.SplitDataStream; http://git-wip-us.apache.org/repos/asf/flink/blob/d832400b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 59b1906..3dc54d6 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.java.typeutils.TupleTypeInfoBase +import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator, GroupedDataStream} import scala.collection.JavaConverters._ @@ -38,7 +39,6 @@ import org.apache.flink.streaming.api.function.sink.SinkFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean import org.apache.flink.streaming.api.windowing.helper.WindowingHelper import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy } -import org.apache.flink.streaming.api.collector.OutputSelector import scala.collection.JavaConversions._ import java.util.HashMap import org.apache.flink.streaming.api.function.aggregation.SumFunction