Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F250118094 for ; Wed, 19 Aug 2015 16:42:23 +0000 (UTC) Received: (qmail 5328 invoked by uid 500); 19 Aug 2015 16:42:11 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 5241 invoked by uid 500); 19 Aug 2015 16:42:11 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 5087 invoked by uid 99); 19 Aug 2015 16:42:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Aug 2015 16:42:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E595EE6826; Wed, 19 Aug 2015 16:42:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Wed, 19 Aug 2015 16:42:14 -0000 Message-Id: In-Reply-To: <728ee2b5cda04d6696808a3585c86e96@git.apache.org> References: <728ee2b5cda04d6696808a3585c86e96@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] flink git commit: [FLINK-2398][api-breaking] Introduce StreamGraphGenerator http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java index 4de368c..6b12013 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java @@ -17,40 +17,41 @@ package org.apache.flink.streaming.api.datastream; -import java.util.List; - import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation; +import org.apache.flink.streaming.api.transformations.FeedbackTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; + +import java.util.Collection; /** - * The iterative data stream represents the start of an iteration in a - * {@link DataStream}. + * The iterative data stream represents the start of an iteration in a {@link DataStream}. * - * @param - * Type of the DataStream + * @param Type of the elements in this Stream */ -public class IterativeDataStream extends - SingleOutputStreamOperator> { - - protected boolean closed = false; +public class IterativeDataStream extends SingleOutputStreamOperator> { - static Integer iterationCount = 0; + // We store these so that we can create a co-iteration if we need to + private DataStream originalInput; + private long maxWaitTime; - protected IterativeDataStream(DataStream dataStream, long maxWaitTime) { - super(dataStream); + protected IterativeDataStream(DataStream dataStream, long maxWaitTime) { + super(dataStream.getExecutionEnvironment(), + new FeedbackTransformation(dataStream.getTransformation(), maxWaitTime)); + this.originalInput = dataStream; + this.maxWaitTime = maxWaitTime; setBufferTimeout(dataStream.environment.getBufferTimeout()); - iterationID = iterationCount; - iterationCount++; - iterationWaitTime = maxWaitTime; } /** * Closes the iteration. This method defines the end of the iterative - * program part that will be fed back to the start of the iteration.
- *
A common usage pattern for streaming iterations is to use output + * program part that will be fed back to the start of the iteration. + * + *

+ * A common usage pattern for streaming iterations is to use output * splitting to send a part of the closing data stream to the head. Refer to * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)} * for more information. @@ -58,50 +59,30 @@ public class IterativeDataStream extends * @param feedbackStream * {@link DataStream} that will be used as input to the iteration * head. - * @param keepPartitioning - * If true the feedback partitioning will be kept as it is (not - * changed to match the input of the iteration head) + * * @return The feedback stream. * */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public DataStream closeWith(DataStream iterationTail, boolean keepPartitioning) { - - if (closed) { - throw new IllegalStateException( - "An iterative data stream can only be closed once. Use union to close with multiple stream."); + public DataStream closeWith(DataStream feedbackStream) { + + Collection> predecessors = feedbackStream.getTransformation().getTransitivePredecessors(); + + if (!predecessors.contains(this.transformation)) { + throw new UnsupportedOperationException( + "Cannot close an iteration with a feedback DataStream that does not originate from said iteration."); } - closed = true; - - streamGraph.addIterationTail((List) iterationTail.unionedStreams, iterationID, - keepPartitioning); - return iterationTail; - } - - /** - * Closes the iteration. This method defines the end of the iterative - * program part that will be fed back to the start of the iteration.
- *
A common usage pattern for streaming iterations is to use output - * splitting to send a part of the closing data stream to the head. Refer to - * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)} - * for more information. - * - * - * @param feedbackStream - * {@link DataStream} that will be used as input to the - * iteration head. - * @return The feedback stream. - * - */ - public DataStream closeWith(DataStream iterationTail) { - return closeWith(iterationTail,false); + ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation()); + + return feedbackStream; } /** * Changes the feedback type of the iteration and allows the user to apply * co-transformations on the input and feedback stream, as in a * {@link ConnectedDataStream}. + * *

* For type safety the user needs to define the feedback type * @@ -109,7 +90,7 @@ public class IterativeDataStream extends * String describing the type information of the feedback stream. * @return A {@link ConnectedIterativeDataStream}. */ - public ConnectedIterativeDataStream withFeedbackType(String feedbackTypeString) { + public ConnectedIterativeDataStream withFeedbackType(String feedbackTypeString) { return withFeedbackType(TypeInfoParser. parse(feedbackTypeString)); } @@ -117,6 +98,7 @@ public class IterativeDataStream extends * Changes the feedback type of the iteration and allows the user to apply * co-transformations on the input and feedback stream, as in a * {@link ConnectedDataStream}. + * *

* For type safety the user needs to define the feedback type * @@ -124,7 +106,7 @@ public class IterativeDataStream extends * Class of the elements in the feedback stream. * @return A {@link ConnectedIterativeDataStream}. */ - public ConnectedIterativeDataStream withFeedbackType(Class feedbackTypeClass) { + public ConnectedIterativeDataStream withFeedbackType(Class feedbackTypeClass) { return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass)); } @@ -132,6 +114,7 @@ public class IterativeDataStream extends * Changes the feedback type of the iteration and allows the user to apply * co-transformations on the input and feedback stream, as in a * {@link ConnectedDataStream}. + * *

* For type safety the user needs to define the feedback type * @@ -139,9 +122,8 @@ public class IterativeDataStream extends * The type information of the feedback stream. * @return A {@link ConnectedIterativeDataStream}. */ - public ConnectedIterativeDataStream withFeedbackType(TypeInformation feedbackType) { - return new ConnectedIterativeDataStream(new IterativeDataStream(this, - iterationWaitTime), feedbackType); + public ConnectedIterativeDataStream withFeedbackType(TypeInformation feedbackType) { + return new ConnectedIterativeDataStream(originalInput, feedbackType, maxWaitTime); } /** @@ -149,6 +131,7 @@ public class IterativeDataStream extends * iterative part of a streaming program, where the original input of the * iteration and the feedback of the iteration are connected as in a * {@link ConnectedDataStream}. + * *

* The user can distinguish between the two inputs using co-transformation, * thus eliminating the need for mapping the inputs and outputs to a common @@ -161,38 +144,18 @@ public class IterativeDataStream extends */ public static class ConnectedIterativeDataStream extends ConnectedDataStream{ - private IterativeDataStream input; - private TypeInformation feedbackType; + private CoFeedbackTransformation coFeedbackTransformation; - public ConnectedIterativeDataStream(IterativeDataStream input, TypeInformation feedbackType) { - super(input, null); - this.input = input; - this.feedbackType = feedbackType; + public ConnectedIterativeDataStream(DataStream input, TypeInformation feedbackType, long waitTime) { + super(input.getExecutionEnvironment(), + input, + new DataStream(input.getExecutionEnvironment(), + new CoFeedbackTransformation(input.getParallelism(), + feedbackType, + waitTime))); + this.coFeedbackTransformation = (CoFeedbackTransformation) getSecond().getTransformation(); } - - @Override - public TypeInformation getType2() { - return feedbackType; - } - - @Override - public SingleOutputStreamOperator transform(String functionName, - TypeInformation outTypeInfo, TwoInputStreamOperator operator) { - @SuppressWarnings({ "unchecked", "rawtypes" }) - SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator( - input.environment, outTypeInfo, operator); - - input.streamGraph.addCoOperator(returnStream.getId(), operator, input.getType(), - feedbackType, outTypeInfo, functionName); - - input.connectGraph(input, returnStream.getId(), 1); - - input.addIterationSource(returnStream, feedbackType); - - return returnStream; - } - /** * Closes the iteration. This method defines the end of the iterative * program part that will be fed back to the start of the iteration as @@ -206,14 +169,16 @@ public class IterativeDataStream extends */ @SuppressWarnings({ "rawtypes", "unchecked" }) public DataStream closeWith(DataStream feedbackStream) { - if (input.closed) { - throw new IllegalStateException( - "An iterative data stream can only be closed once. Use union to close with multiple stream."); + + Collection> predecessors = feedbackStream.getTransformation().getTransitivePredecessors(); + + if (!predecessors.contains(this.coFeedbackTransformation)) { + throw new UnsupportedOperationException( + "Cannot close an iteration with a feedback DataStream that does not originate from said iteration."); } - input.closed = true; - - input.streamGraph.addIterationTail((List) feedbackStream.unionedStreams, - input.iterationID, true); + + coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation()); + return feedbackStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java index b944302..7628815 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java @@ -19,7 +19,11 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.runtime.partitioner.HashPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; /** @@ -28,11 +32,10 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; * are also possible on a KeyedDataStream, with the exception of partitioning methods such as shuffle, forward and groupBy. * * - * @param - * The output type of the {@link KeyedDataStream}. + * @param The type of the elements in the Keyed Stream */ -public class KeyedDataStream extends DataStream { - KeySelector keySelector; +public class KeyedDataStream extends DataStream { + KeySelector keySelector; /** * Creates a new {@link KeyedDataStream} using the given {@link KeySelector} @@ -43,35 +46,35 @@ public class KeyedDataStream extends DataStream { * @param keySelector * Function for determining state partitions */ - public KeyedDataStream(DataStream dataStream, KeySelector keySelector) { - super(dataStream.partitionByHash(keySelector)); + public KeyedDataStream(DataStream dataStream, KeySelector keySelector) { + super(dataStream.getExecutionEnvironment(), new PartitionTransformation(dataStream.getTransformation(), new HashPartitioner(keySelector))); this.keySelector = keySelector; } - protected KeyedDataStream(KeyedDataStream dataStream) { - super(dataStream); - this.keySelector = dataStream.keySelector; - } - - public KeySelector getKeySelector() { + public KeySelector getKeySelector() { return this.keySelector; } @Override - protected DataStream setConnectionType(StreamPartitioner partitioner) { + protected DataStream setConnectionType(StreamPartitioner partitioner) { throw new UnsupportedOperationException("Cannot override partitioning for KeyedDataStream."); } @Override - public KeyedDataStream copy() { - return new KeyedDataStream(this); - } - - @Override public SingleOutputStreamOperator transform(String operatorName, - TypeInformation outTypeInfo, OneInputStreamOperator operator) { + TypeInformation outTypeInfo, OneInputStreamOperator operator) { + SingleOutputStreamOperator returnStream = super.transform(operatorName, outTypeInfo,operator); - streamGraph.setKey(returnStream.getId(), keySelector); + + ((OneInputTransformation) returnStream.getTransformation()).setStateKeySelector( + keySelector); return returnStream; } + + @Override + public DataStreamSink addSink(SinkFunction sinkFunction) { + DataStreamSink result = super.addSink(sinkFunction); + result.getTransformation().setStateKeySelector(keySelector); + return result; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index b4a99c8..016cf5e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -24,23 +24,23 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; /** * The SingleOutputStreamOperator represents a user defined transformation * applied on a {@link DataStream} with one predefined output type. - * - * @param - * Output type of the operator. - * @param - * Type of the operator. + * + * @param The type of the elements in this Stream + * @param Type of the operator. */ -public class SingleOutputStreamOperator> extends - DataStream { +public class SingleOutputStreamOperator> extends DataStream { - protected boolean isSplit; - protected StreamOperator operator; + protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation transformation) { + super(environment, transformation); + } /** * Gets the name of the current data stream. This name is @@ -48,8 +48,8 @@ public class SingleOutputStreamOperator name(String name){ - streamGraph.getStreamNode(id).setOperatorName(name); + public SingleOutputStreamOperator name(String name){ + transformation.setName(name); return this; } - protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, - TypeInformation outTypeInfo, StreamOperator operator) { - super(environment, outTypeInfo); - this.isSplit = false; - this.operator = operator; - } - - @SuppressWarnings("unchecked") - protected SingleOutputStreamOperator(DataStream dataStream) { - super(dataStream); - if (dataStream instanceof SingleOutputStreamOperator) { - this.isSplit = ((SingleOutputStreamOperator) dataStream).isSplit; - this.operator = ((SingleOutputStreamOperator) dataStream).operator; - } - } - /** * Sets the parallelism for this operator. The degree must be 1 or more. * @@ -86,13 +70,12 @@ public class SingleOutputStreamOperator setParallelism(int parallelism) { + public SingleOutputStreamOperator setParallelism(int parallelism) { if (parallelism < 1) { throw new IllegalArgumentException("The parallelism of an operator must be at least 1."); } - this.parallelism = parallelism; - streamGraph.setParallelism(id, parallelism); + transformation.setParallelism(parallelism); return this; } @@ -105,39 +88,34 @@ public class SingleOutputStreamOperator setBufferTimeout(long timeoutMillis) { - streamGraph.setBufferTimeout(id, timeoutMillis); + public SingleOutputStreamOperator setBufferTimeout(long timeoutMillis) { + transformation.setBufferTimeout(timeoutMillis); return this; } @SuppressWarnings("unchecked") - public SingleOutputStreamOperator broadcast() { - return (SingleOutputStreamOperator) super.broadcast(); + public SingleOutputStreamOperator broadcast() { + return (SingleOutputStreamOperator) super.broadcast(); } @SuppressWarnings("unchecked") - public SingleOutputStreamOperator shuffle() { - return (SingleOutputStreamOperator) super.shuffle(); + public SingleOutputStreamOperator shuffle() { + return (SingleOutputStreamOperator) super.shuffle(); } @SuppressWarnings("unchecked") - public SingleOutputStreamOperator forward() { - return (SingleOutputStreamOperator) super.forward(); + public SingleOutputStreamOperator forward() { + return (SingleOutputStreamOperator) super.forward(); } @SuppressWarnings("unchecked") - public SingleOutputStreamOperator rebalance() { - return (SingleOutputStreamOperator) super.rebalance(); + public SingleOutputStreamOperator rebalance() { + return (SingleOutputStreamOperator) super.rebalance(); } @SuppressWarnings("unchecked") - public SingleOutputStreamOperator global() { - return (SingleOutputStreamOperator) super.global(); - } - - @Override - public SingleOutputStreamOperator copy() { - return new SingleOutputStreamOperator(this); + public SingleOutputStreamOperator global() { + return (SingleOutputStreamOperator) super.global(); } /** @@ -149,8 +127,8 @@ public class SingleOutputStreamOperator setChainingStrategy(ChainingStrategy strategy) { - this.operator.setChainingStrategy(strategy); + private SingleOutputStreamOperator setChainingStrategy(ChainingStrategy strategy) { + this.transformation.setChainingStrategy(strategy); return this; } @@ -162,7 +140,7 @@ public class SingleOutputStreamOperator disableChaining() { + public SingleOutputStreamOperator disableChaining() { return setChainingStrategy(AbstractStreamOperator.ChainingStrategy.NEVER); } @@ -173,7 +151,7 @@ public class SingleOutputStreamOperator startNewChain() { + public SingleOutputStreamOperator startNewChain() { return setChainingStrategy(AbstractStreamOperator.ChainingStrategy.HEAD); } @@ -216,7 +194,7 @@ public class SingleOutputStreamOperatorparse(typeInfoString)); + return returns(TypeInfoParser.parse(typeInfoString)); } /** @@ -243,11 +221,11 @@ public class SingleOutputStreamOperator typeInfo) { + public O returns(TypeInformation typeInfo) { if (typeInfo == null) { throw new IllegalArgumentException("Type information must not be null."); } - fillInType(typeInfo); + transformation.setOutputType(typeInfo); @SuppressWarnings("unchecked") O returnType = (O) this; return returnType; @@ -277,13 +255,13 @@ public class SingleOutputStreamOperator typeClass) { + public O returns(Class typeClass) { if (typeClass == null) { throw new IllegalArgumentException("Type class must not be null."); } try { - TypeInformation ti = (TypeInformation) TypeExtractor.createTypeInfo(typeClass); + TypeInformation ti = (TypeInformation) TypeExtractor.createTypeInfo(typeClass); return returns(ti); } catch (InvalidTypesException e) { @@ -291,6 +269,11 @@ public class SingleOutputStreamOperator setConnectionType(StreamPartitioner partitioner) { + return new SingleOutputStreamOperator(this.getExecutionEnvironment(), new PartitionTransformation(this.getTransformation(), partitioner)); + } + /** * By default all operators in a streaming job share the same resource * group. Each resource group takes as many task manager slots as the @@ -305,8 +288,8 @@ public class SingleOutputStreamOperator startNewResourceGroup() { - streamGraph.setResourceStrategy(getId(), ResourceStrategy.NEWGROUP); + public SingleOutputStreamOperator startNewResourceGroup() { + transformation.setResourceStrategy(ResourceStrategy.NEWGROUP); return this; } @@ -319,8 +302,8 @@ public class SingleOutputStreamOperator isolateResources() { - streamGraph.setResourceStrategy(getId(), ResourceStrategy.ISOLATE); + public SingleOutputStreamOperator isolateResources() { + transformation.setResourceStrategy(ResourceStrategy.ISOLATE); return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java index 6b95fe7..bc9ecfb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java @@ -17,23 +17,23 @@ package org.apache.flink.streaming.api.datastream; -import java.util.Arrays; - +import com.google.common.collect.Lists; import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.transformations.SelectTransformation; +import org.apache.flink.streaming.api.transformations.SplitTransformation; /** * The SplitDataStream represents an operator that has been split using an * {@link OutputSelector}. Named outputs can be selected using the * {@link #select} function. To apply transformation on the whole output simply * call the transformation on the SplitDataStream - * - * @param - * The type of the output. + * + * @param The type of the elements in the Stream */ public class SplitDataStream extends DataStream { - protected SplitDataStream(DataStream dataStream) { - super(dataStream); + protected SplitDataStream(DataStream dataStream, OutputSelector outputSelector) { + super(dataStream.getExecutionEnvironment(), new SplitTransformation(dataStream.getTransformation(), outputSelector)); } /** @@ -55,12 +55,8 @@ public class SplitDataStream extends DataStream { } } - DataStream returnStream = copy(); - - for (DataStream ds : returnStream.unionedStreams) { - ds.selectedNames = Arrays.asList(outputNames); - } - return returnStream; + SelectTransformation selectTransform = new SelectTransformation(this.getTransformation(), Lists.newArrayList(outputNames)); + return new DataStream(this.getExecutionEnvironment(), selectTransform); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 9565f4b..bf3a11a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -99,7 +99,7 @@ public class WindowedDataStream { protected EvictionPolicy userEvicter; protected WindowedDataStream(DataStream dataStream, WindowingHelper policyHelper) { - this.dataStream = dataStream.copy(); + this.dataStream = dataStream; this.triggerHelper = policyHelper; if (dataStream instanceof GroupedDataStream) { @@ -109,7 +109,7 @@ public class WindowedDataStream { protected WindowedDataStream(DataStream dataStream, TriggerPolicy trigger, EvictionPolicy evicter) { - this.dataStream = dataStream.copy(); + this.dataStream = dataStream; this.userTrigger = trigger; this.userEvicter = evicter; @@ -120,7 +120,7 @@ public class WindowedDataStream { } protected WindowedDataStream(WindowedDataStream windowedDataStream) { - this.dataStream = windowedDataStream.dataStream.copy(); + this.dataStream = windowedDataStream.dataStream; this.discretizerKey = windowedDataStream.discretizerKey; this.groupByKey = windowedDataStream.groupByKey; this.triggerHelper = windowedDataStream.triggerHelper; http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java index 3dd02a3..e0aafb7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java @@ -30,10 +30,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.co.CrossWindowFunction; -import org.apache.flink.streaming.api.operators.co.CoStreamWindow; public class StreamCrossOperator extends - TemporalOperator> { + TemporalOperator>> { public StreamCrossOperator(DataStream input1, DataStream input2) { super(input1, input2); @@ -48,37 +47,42 @@ public class StreamCrossOperator extends } @Override - protected CrossWindow createNextWindowOperator() { + protected CrossWindow> createNextWindowOperator() { CrossWindowFunction> crossWindowFunction = new CrossWindowFunction>( clean(new CrossOperator.DefaultCrossFunction())); - return new CrossWindow(this, input1.connect(input2).addGeneralWindowCombine( + return new CrossWindow>(this, input1.connect(input2).addGeneralWindowCombine( crossWindowFunction, new TupleTypeInfo>(input1.getType(), input2.getType()), windowSize, slideInterval, timeStamp1, timeStamp2)); } - public static class CrossWindow extends - SingleOutputStreamOperator, CrossWindow> implements - TemporalWindow> { + public static class CrossWindow extends + SingleOutputStreamOperator> implements + TemporalWindow> { private StreamCrossOperator op; - public CrossWindow(StreamCrossOperator op, DataStream> ds) { - super(ds); + public CrossWindow(StreamCrossOperator op, DataStream ds) { + super(ds.getExecutionEnvironment(), ds.getTransformation()); this.op = op; } - public CrossWindow every(long length, TimeUnit timeUnit) { + public CrossWindow every(long length, TimeUnit timeUnit) { return every(timeUnit.toMillis(length)); } @SuppressWarnings("unchecked") - public CrossWindow every(long length) { - ((CoStreamWindow) streamGraph.getStreamNode(id).getOperator()) - .setSlideSize(length); - return this; + public CrossWindow every(long length) { + + CrossWindowFunction> crossWindowFunction = new CrossWindowFunction>( + clean(new CrossOperator.DefaultCrossFunction())); + + return (CrossWindow) new CrossWindow>(op, op.input1.connect(op.input2).addGeneralWindowCombine( + crossWindowFunction, + new TupleTypeInfo>(op.input1.getType(), op.input2.getType()), op.windowSize, + length, op.timeStamp1, op.timeStamp2)); } /** @@ -97,13 +101,12 @@ public class StreamCrossOperator extends TypeInformation outTypeInfo = TypeExtractor.getCrossReturnTypes(function, op.input1.getType(), op.input2.getType()); - CoStreamWindow operator = new CoStreamWindow( - new CrossWindowFunction(clean(function)), op.windowSize, - op.slideInterval, op.timeStamp1, op.timeStamp2); - - streamGraph.setOperator(id, operator); + CrossWindowFunction crossWindowFunction = new CrossWindowFunction(clean(function)); - return ((SingleOutputStreamOperator) this).returns(outTypeInfo); + return new CrossWindow(op, op.input1.connect(op.input2).addGeneralWindowCombine( + crossWindowFunction, + outTypeInfo, op.windowSize, + op.slideInterval, op.timeStamp1, op.timeStamp2)); } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java index e18e14b..e48d707 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java @@ -31,7 +31,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.co.JoinWindowFunction; -import org.apache.flink.streaming.api.operators.co.CoStreamWindow; import org.apache.flink.streaming.util.keys.KeySelectorUtil; public class StreamJoinOperator extends @@ -156,7 +155,7 @@ public class StreamJoinOperator extends * @return A streaming join operator. Call {@link JoinedStream#with} to * apply a custom wrapping */ - public JoinedStream equalTo(int... fields) { + public JoinedStream> equalTo(int... fields) { keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys(fields, type2), type2, op.input1.getExecutionEnvironment().getConfig()); return createJoinOperator(); @@ -175,7 +174,7 @@ public class StreamJoinOperator extends * @return A streaming join operator. Call {@link JoinedStream#with} to * apply a custom wrapping */ - public JoinedStream equalTo(String... fields) { + public JoinedStream> equalTo(String... fields) { this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys(fields, type2), type2, op.input1.getExecutionEnvironment().getConfig()); return createJoinOperator(); @@ -198,12 +197,12 @@ public class StreamJoinOperator extends * @return A streaming join operator. Call {@link JoinedStream#with} to * apply a custom wrapping */ - public JoinedStream equalTo(KeySelector keySelector) { + public JoinedStream> equalTo(KeySelector keySelector) { this.keys2 = keySelector; return createJoinOperator(); } - private JoinedStream createJoinOperator() { + private JoinedStream> createJoinOperator() { JoinFunction> joinFunction = new DefaultJoinFunction(); @@ -213,42 +212,44 @@ public class StreamJoinOperator extends TypeInformation> outType = new TupleTypeInfo>( op.input1.getType(), op.input2.getType()); - return new JoinedStream(this, op.input1 + return new JoinedStream>(this, op.input1 .groupBy(keys1) .connect(op.input2.groupBy(keys2)) .addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)); } - } - - public static class JoinedStream extends - SingleOutputStreamOperator, JoinedStream> { - private final JoinPredicate predicate; - - private JoinedStream(JoinPredicate predicate, DataStream> ds) { - super(ds); - this.predicate = predicate; - } - - /** - * Completes a stream join.

The resulting operator wraps each pair - * of joining elements using the user defined {@link JoinFunction} - * - * @return The joined data stream. - */ - @SuppressWarnings("unchecked") - public SingleOutputStreamOperator with(JoinFunction joinFunction) { - - TypeInformation outType = TypeExtractor.getJoinReturnTypes(joinFunction, - predicate.op.input1.getType(), predicate.op.input2.getType()); - - CoStreamWindow operator = new CoStreamWindow( - getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize, - predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2); - - streamGraph.setOperator(id, operator); - return ((SingleOutputStreamOperator) this).returns(outType); + public static class JoinedStream extends + SingleOutputStreamOperator> { + private final JoinPredicate predicate; + + private JoinedStream(JoinPredicate predicate, DataStream ds) { + super(ds.getExecutionEnvironment(), ds.getTransformation()); + this.predicate = predicate; + } + + /** + * Completes a stream join.

The resulting operator wraps each pair + * of joining elements using the user defined {@link JoinFunction} + * + * @return The joined data stream. + */ + @SuppressWarnings("unchecked") + public SingleOutputStreamOperator with(JoinFunction joinFunction) { + + TypeInformation outType = TypeExtractor.getJoinReturnTypes(joinFunction, + predicate.op.input1.getType(), predicate.op.input2.getType()); + + JoinWindowFunction joinWindowFunction = getJoinWindowFunction(joinFunction, predicate); + + + return new JoinedStream( + predicate, predicate.op.input1 + .groupBy(predicate.keys1) + .connect(predicate.op.input2.groupBy(predicate.keys2)) + .addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize, + predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2)); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java index 3fe7eb7..9da00f2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java @@ -40,8 +40,8 @@ public abstract class TemporalOperator> { if (input1 == null || input2 == null) { throw new NullPointerException(); } - this.input1 = input1.copy(); - this.input2 = input2.copy(); + this.input1 = input1; + this.input2 = input2; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index 3efad93..58459b7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -43,9 +43,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { */ @Override public JobExecutionResult execute(String jobName) throws Exception { - JobExecutionResult result = ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(), - getConfig().isSysoutLoggingEnabled()); - streamGraph.clear(); // clear graph to allow submitting another job via the same environment. + JobExecutionResult result = ClusterUtil.runOnMiniCluster(getStreamGraph().getJobGraph(), getParallelism(), getConfig().isSysoutLoggingEnabled()); + transformations.clear(); return result; } } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 75e15d7..2f8938f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -84,15 +84,15 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { @Override public JobExecutionResult execute() throws ProgramInvocationException { - - JobGraph jobGraph = streamGraph.getJobGraph(); + JobGraph jobGraph = getStreamGraph().getJobGraph(); + transformations.clear(); return executeRemotely(jobGraph); } @Override public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - - JobGraph jobGraph = streamGraph.getJobGraph(jobName); + JobGraph jobGraph = getStreamGraph().getJobGraph(jobName); + transformations.clear(); return executeRemotely(jobGraph); } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 9bfeb2f..c2335d6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -63,15 +63,16 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { @Override public JobExecutionResult execute(String jobName) throws Exception { - currentEnvironment = null; JobGraph jobGraph; if (jobName == null) { - jobGraph = this.streamGraph.getJobGraph(); + jobGraph = this.getStreamGraph().getJobGraph(); } else { - jobGraph = this.streamGraph.getJobGraph(jobName); + jobGraph = this.getStreamGraph().getJobGraph(jobName); } + transformations.clear(); + for (File file : jars) { jobGraph.addJar(new Path(file.getAbsolutePath())); } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index fba4e28..a019a31 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.environment; import com.esotericsoftware.kryo.Serializer; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; @@ -60,8 +61,9 @@ import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.types.StringValue; import org.apache.flink.util.SplittableIterator; @@ -86,22 +88,26 @@ public abstract class StreamExecutionEnvironment { private ExecutionConfig config = new ExecutionConfig(); - protected static StreamExecutionEnvironment currentEnvironment; + protected List> transformations = Lists.newArrayList(); - protected StreamGraph streamGraph; + protected boolean isChainingEnabled = true; + + protected long checkpointInterval = -1; // disabled + + protected CheckpointingMode checkpointingMode = null; + + protected boolean forceCheckpointing = false; + + protected StateHandleProvider stateHandleProvider; + + /** The environment of the context (local by default, cluster if invoked through command line) */ + private static StreamExecutionEnvironmentFactory contextEnvironmentFactory; // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- /** - * Constructor for creating StreamExecutionEnvironment - */ - protected StreamExecutionEnvironment() { - streamGraph = new StreamGraph(this); - } - - /** * Gets the config object. */ public ExecutionConfig getConfig() { @@ -221,10 +227,19 @@ public abstract class StreamExecutionEnvironment { * @return StreamExecutionEnvironment with chaining disabled. */ public StreamExecutionEnvironment disableOperatorChaining() { - streamGraph.setChaining(false); + this.isChainingEnabled = false; return this; } + /** + * Returns whether operator chaining is enabled. + * + * @return {@code true} if chaining is enabled, false otherwise. + */ + public boolean isChainingEnabled() { + return isChainingEnabled; + } + // ------------------------------------------------------------------------ // Checkpointing Settings // ------------------------------------------------------------------------ @@ -275,10 +290,9 @@ public abstract class StreamExecutionEnvironment { if (interval <= 0) { throw new IllegalArgumentException("the checkpoint interval must be positive"); } - - streamGraph.setCheckpointingEnabled(true); - streamGraph.setCheckpointingInterval(interval); - streamGraph.setCheckpointingMode(mode); + + this.checkpointInterval = interval; + this.checkpointingMode = mode; return this; } @@ -303,19 +317,9 @@ public abstract class StreamExecutionEnvironment { */ @Deprecated public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) { - if (mode == null) { - throw new NullPointerException("checkpoint mode must not be null"); - } - if (interval <= 0) { - throw new IllegalArgumentException("the checkpoint interval must be positive"); - } - - streamGraph.setCheckpointingEnabled(true); - streamGraph.setCheckpointingInterval(interval); - streamGraph.setCheckpointingMode(mode); - if (force) { - streamGraph.forceCheckpoint(); - } + this.enableCheckpointing(interval, mode); + + this.forceCheckpointing = force; return this; } @@ -334,12 +338,35 @@ public abstract class StreamExecutionEnvironment { * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.

*/ public StreamExecutionEnvironment enableCheckpointing() { - streamGraph.setCheckpointingEnabled(true); - streamGraph.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); return this; } /** + * Returns the checkpointing interval or -1 if checkpointing is disabled. + * + * @return The checkpointing interval or -1 + */ + public long getCheckpointInterval() { + return checkpointInterval; + } + + + /** + * Returns whether checkpointing is force-enabled. + */ + public boolean isForceCheckpointing() { + return forceCheckpointing; + } + + /** + * Returns the {@link CheckpointingMode}. + */ + public CheckpointingMode getCheckpointingMode() { + return checkpointingMode; + } + + /** * Sets the {@link StateHandleProvider} used for storing operator state * checkpoints when checkpointing is enabled. *

@@ -348,11 +375,22 @@ public abstract class StreamExecutionEnvironment { * */ public StreamExecutionEnvironment setStateHandleProvider(StateHandleProvider provider) { - streamGraph.setStateHandleProvider(provider); + this.stateHandleProvider = provider; return this; } /** + * Returns the {@link org.apache.flink.runtime.state.StateHandle} + * + * @see #setStateHandleProvider(org.apache.flink.runtime.state.StateHandleProvider) + * + * @return The StateHandleProvider + */ + public StateHandleProvider getStateHandleProvider() { + return stateHandleProvider; + } + + /** * Sets the number of times that failed tasks are re-executed. A value of * zero effectively disables fault tolerance. A value of {@code -1} * indicates that the system default value (as defined in the configuration) @@ -591,7 +629,7 @@ public abstract class StreamExecutionEnvironment { // must not have null elements and mixed elements FromElementsFunction.checkCollection(data, typeInfo.getTypeClass()); - + SourceFunction function; try { function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); @@ -599,7 +637,7 @@ public abstract class StreamExecutionEnvironment { catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } - return addSource(function, "Collection Source", typeInfo); + return addSource(function, "Collection Source", typeInfo).setParallelism(1); } /** @@ -980,15 +1018,12 @@ public abstract class StreamExecutionEnvironment { private DataStreamSource createInput(InputFormat inputFormat, TypeInformation typeInfo, String sourceName) { FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); - DataStreamSource returnStream = addSource(function, sourceName).returns(typeInfo); - streamGraph.setInputFormat(returnStream.getId(), inputFormat); - return returnStream; + return addSource(function, sourceName).returns(typeInfo); } /** - * Adds a data source with a custom type information thus opening a - * {@link org.apache.flink.streaming.api.datastream.DataStream}. Only in very special cases does the user need - * to support type information. Otherwise use {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)} + * Adds a Data Source to the streaming topology. + * *

* By default sources have a parallelism of 1. To enable parallel execution, the user defined source should * implement {@link org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or extend {@link @@ -1078,10 +1113,9 @@ public abstract class StreamExecutionEnvironment { boolean isParallel = function instanceof ParallelSourceFunction; clean(function); - StreamOperator sourceOperator = new StreamSource(function); + StreamSource sourceOperator = new StreamSource(function); - return new DataStreamSource(this, sourceName, typeInfo, sourceOperator, - isParallel, sourceName); + return new DataStreamSource(this, typeInfo, sourceOperator, isParallel, sourceName); } // -------------------------------------------------------------------------------------------- @@ -1098,20 +1132,20 @@ public abstract class StreamExecutionEnvironment { * executed. */ public static StreamExecutionEnvironment getExecutionEnvironment() { - if (currentEnvironment != null) { - return currentEnvironment; + if (contextEnvironmentFactory != null) { + return contextEnvironmentFactory.createExecutionEnvironment(); } + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { ContextEnvironment ctx = (ContextEnvironment) env; - currentEnvironment = createContextEnvironment(ctx.getClient(), ctx.getJars(), + return createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getParallelism(), ctx.isWait()); } else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) { - currentEnvironment = new StreamPlanEnvironment(env); + return new StreamPlanEnvironment(env); } else { return createLocalEnvironment(); } - return currentEnvironment; } private static StreamExecutionEnvironment createContextEnvironment(Client client, @@ -1143,9 +1177,9 @@ public abstract class StreamExecutionEnvironment { * @return A local execution environment with the specified parallelism. */ public static LocalStreamEnvironment createLocalEnvironment(int parallelism) { - currentEnvironment = new LocalStreamEnvironment(); - currentEnvironment.setParallelism(parallelism); - return (LocalStreamEnvironment) currentEnvironment; + LocalStreamEnvironment env = new LocalStreamEnvironment(); + env.setParallelism(parallelism); + return env; } // TODO:fix cluster default parallelism @@ -1172,8 +1206,8 @@ public abstract class StreamExecutionEnvironment { */ public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles) { - currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles); - return currentEnvironment; + RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles); + return env; } /** @@ -1199,9 +1233,9 @@ public abstract class StreamExecutionEnvironment { */ public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles) { - currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles); - currentEnvironment.setParallelism(parallelism); - return currentEnvironment; + RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles); + env.setParallelism(parallelism); + return env; } /** @@ -1239,7 +1273,11 @@ public abstract class StreamExecutionEnvironment { * @return The streamgraph representing the transformations */ public StreamGraph getStreamGraph() { - return streamGraph; + if (transformations.size() <= 0) { + throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); + } + StreamGraph result = StreamGraphGenerator.generate(this, transformations); + return result; } /** @@ -1254,10 +1292,6 @@ public abstract class StreamExecutionEnvironment { return getStreamGraph().getStreamingPlanAsJSON(); } - protected static void initializeFromFactory(StreamExecutionEnvironmentFactory eef) { - currentEnvironment = eef.createExecutionEnvironment(); - } - /** * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig} @@ -1270,4 +1304,28 @@ public abstract class StreamExecutionEnvironment { return f; } + /** + * Adds an operator to the list of operators that should be executed when calling + * {@link #execute}. + * + *

+ * When calling {@link #execute()} only the operators that where previously added to the list + * are executed. + * + *

+ * This is not meant to be used by users. The API methods that create operators must call + * this method. + */ + public void addOperator(StreamTransformation transformation) { + Preconditions.checkNotNull(transformation, "Sinks must not be null."); + this.transformations.add(transformation); + } + + // -------------------------------------------------------------------------------------------- + // Methods to control the context and local environments for execution from packaged programs + // -------------------------------------------------------------------------------------------- + + protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { + contextEnvironmentFactory = ctx; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java index 02fccd0..8c1408e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -24,6 +24,7 @@ import org.apache.flink.client.program.Client.OptimizerPlanEnvironment; import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.streaming.api.graph.StreamGraph; public class StreamPlanEnvironment extends StreamExecutionEnvironment { @@ -55,10 +56,12 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { @Override public JobExecutionResult execute(String jobName) throws Exception { - currentEnvironment = null; + StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); + transformations.clear(); + if (env instanceof OptimizerPlanEnvironment) { ((OptimizerPlanEnvironment) env).setPlan(streamGraph); } else if (env instanceof PreviewPlanEnvironment) { http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java index cf08e5a..253c076 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java @@ -134,4 +134,12 @@ public class FileSourceFunction extends RichParallelSourceFunction { isRunning = false; } + + /** + * Returns the {@code InputFormat}. This is only needed because we need to set the input + * split assigner on the {@code StreamGraph}. + */ + public InputFormat getFormat() { + return format; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java index bc20fff..743ee4a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,7 +52,19 @@ public class JSONGenerator { JSONArray nodes = new JSONArray(); json.put("nodes", nodes); List operatorIDs = new ArrayList(streamGraph.getVertexIDs()); - Collections.sort(operatorIDs); + Collections.sort(operatorIDs, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + // put sinks at the back + if (streamGraph.getSinkIDs().contains(o1)) { + return 1; + } else if (streamGraph.getSinkIDs().contains(o2)) { + return -1; + } else { + return o1 - o2; + } + } + }); visit(nodes, operatorIDs, new HashMap()); return json.toString(); } @@ -87,7 +100,7 @@ public class JSONGenerator { for (StreamEdge inEdge : vertex.getInEdges()) { int operator = inEdge.getSourceId(); - if (streamGraph.vertexIDtoLoop.containsKey(operator)) { + if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) { iterationHead = operator; } } @@ -119,7 +132,7 @@ public class JSONGenerator { toVisit.remove(vertexID); // Ignoring head and tail to avoid redundancy - if (!streamGraph.vertexIDtoLoop.containsKey(vertexID)) { + if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) { JSONObject obj = new JSONObject(); jsonArray.put(obj); decorateNode(vertexID, obj); @@ -131,7 +144,7 @@ public class JSONGenerator { if (edgeRemapings.keySet().contains(inputID)) { decorateEdge(inEdges, vertexID, inputID, inputID); - } else if (!streamGraph.vertexIDtoLoop.containsKey(inputID)) { + } else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) { decorateEdge(iterationInEdges, vertexID, inputID, inputID); } } @@ -147,8 +160,7 @@ public class JSONGenerator { JSONObject input = new JSONObject(); inputArray.put(input); input.put(ID, mappedInputID); - input.put(SHIP_STRATEGY, streamGraph.getStreamEdge(inputID, vertexID).getPartitioner() - .getStrategy()); + input.put(SHIP_STRATEGY, streamGraph.getStreamEdge(inputID, vertexID).getPartitioner()); input.put(SIDE, (inputArray.length() == 0) ? "first" : "second"); } @@ -161,8 +173,10 @@ public class JSONGenerator { if (streamGraph.getSourceIDs().contains(vertexID)) { node.put(PACT, "Data Source"); + } else if (streamGraph.getSinkIDs().contains(vertexID)) { + node.put(PACT, "Data Sink"); } else { - node.put(PACT, "Data Stream"); + node.put(PACT, "Operator"); } StreamOperator operator = streamGraph.getStreamNode(vertexID).getOperator(); http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 4f19db6..2c422d9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -257,29 +257,6 @@ public class StreamConfig implements Serializable { return config.getLong(ITERATON_WAIT, 0); } - public void setSelectedNames(Integer output, List selected) { - if (selected == null) { - selected = new ArrayList(); - } - - try { - InstantiationUtil.writeObjectToConfig(selected, this.config, OUTPUT_NAME + output); - } catch (IOException e) { - throw new StreamTaskException("Cannot serialize OutputSelector for name \"" + output+ "\".", e); - } - } - - @SuppressWarnings("unchecked") - public List getSelectedNames(Integer output, ClassLoader cl) { - List selectedNames; - try { - selectedNames = (List) InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_NAME + output, cl); - } catch (Exception e) { - throw new StreamTaskException("Cannot deserialize OutputSelector for name \"" + output + "\".", e); - } - return selectedNames == null ? new ArrayList() : selectedNames; - } - public void setNumberOfInputs(int numberOfInputs) { config.setInteger(NUMBER_OF_INPUTS, numberOfInputs); }