flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [43/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package
Date Wed, 21 Oct 2015 09:03:59 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
deleted file mode 100644
index 4074a1d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
-
-/**
- * {@code ConnectedStreams} represents two connected streams of (possible) different data types. It
- * can be used to apply transformations such as {@link CoMapFunction} on two
- * {@link DataStream DataStreams}
- * 
- * @param <IN1> Type of the first input data steam.
- * @param <IN2> Type of the second input data stream.
- */
-public class ConnectedStreams<IN1, IN2> {
-
-	protected StreamExecutionEnvironment environment;
-	protected DataStream<IN1> inputStream1;
-	protected DataStream<IN2> inputStream2;
-
-	protected ConnectedStreams(StreamExecutionEnvironment env,
-			DataStream<IN1> input1,
-			DataStream<IN2> input2) {
-		this.environment = env;
-		if (input1 != null) {
-			this.inputStream1 = input1;
-		}
-		if (input2 != null) {
-			this.inputStream2 = input2;
-		}
-	}
-
-	public StreamExecutionEnvironment getExecutionEnvironment() {
-		return environment;
-	}
-
-	/**
-	 * Returns the first {@link DataStream}.
-	 *
-	 * @return The first DataStream.
-	 */
-	public DataStream<IN1> getFirstInput() {
-		return inputStream1;
-	}
-
-	/**
-	 * Returns the second {@link DataStream}.
-	 *
-	 * @return The second DataStream.
-	 */
-	public DataStream<IN2> getSecondInput() {
-		return inputStream2;
-	}
-
-	/**
-	 * Gets the type of the first input
-	 *
-	 * @return The type of the first input
-	 */
-	public TypeInformation<IN1> getType1() {
-		return inputStream1.getType();
-	}
-
-	/**
-	 * Gets the type of the second input
-	 *
-	 * @return The type of the second input
-	 */
-	public TypeInformation<IN2> getType2() {
-		return inputStream2.getType();
-	}
-
-	/**
-	 * KeyBy operation for connected data stream. Assigns keys to the elements of
-	 * input1 and input2 according to keyPosition1 and keyPosition2.
-	 *
-	 * @param keyPosition1
-	 *            The field used to compute the hashcode of the elements in the
-	 *            first input stream.
-	 * @param keyPosition2
-	 *            The field used to compute the hashcode of the elements in the
-	 *            second input stream.
-	 * @return The grouped {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) {
-		return new ConnectedStreams<>(this.environment, inputStream1.keyBy(keyPosition1),
-				inputStream2.keyBy(keyPosition2));
-	}
-
-	/**
-	 * KeyBy operation for connected data stream. Assigns keys to the elements of
-	 * input1 and input2 according to keyPositions1 and keyPositions2.
-	 *
-	 * @param keyPositions1
-	 *            The fields used to group the first input stream.
-	 * @param keyPositions2
-	 *            The fields used to group the second input stream.
-	 * @return The grouped {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) {
-		return new ConnectedStreams<>(environment, inputStream1.keyBy(keyPositions1),
-				inputStream2.keyBy(keyPositions2));
-	}
-
-	/**
-	 * KeyBy operation for connected data stream using key expressions. Assigns keys to
-	 * the elements of input1 and input2 according to field1 and field2. A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field1
-	 *            The grouping expression for the first input
-	 * @param field2
-	 *            The grouping expression for the second input
-	 * @return The grouped {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) {
-		return new ConnectedStreams<>(environment, inputStream1.keyBy(field1),
-				inputStream2.keyBy(field2));
-	}
-
-	/**
-	 * KeyBy operation for connected data stream using key expressions.
-	 * the elements of input1 and input2 according to fields1 and fields2. A
-	 * field expression is either the name of a public field or a getter method
-	 * with parentheses of the {@link DataStream}S underlying type. A dot can be
-	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-	 * .
-	 *
-	 * @param fields1
-	 *            The grouping expressions for the first input
-	 * @param fields2
-	 *            The grouping expressions for the second input
-	 * @return The grouped {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) {
-		return new ConnectedStreams<>(environment, inputStream1.keyBy(fields1),
-				inputStream2.keyBy(fields2));
-	}
-
-	/**
-	 * KeyBy operation for connected data stream. Assigns keys to the elements of
-	 * input1 and input2 using keySelector1 and keySelector2.
-	 *
-	 * @param keySelector1
-	 *            The {@link KeySelector} used for grouping the first input
-	 * @param keySelector2
-	 *            The {@link KeySelector} used for grouping the second input
-	 * @return The partitioned {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
-		return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1),
-				inputStream2.keyBy(keySelector2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream. Partitions the elements of
-	 * input1 and input2 according to keyPosition1 and keyPosition2.
-	 *
-	 * @param keyPosition1
-	 *            The field used to compute the hashcode of the elements in the
-	 *            first input stream.
-	 * @param keyPosition2
-	 *            The field used to compute the hashcode of the elements in the
-	 *            second input stream.
-	 * @return The partitioned {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> partitionByHash(int keyPosition1, int keyPosition2) {
-		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPosition1),
-				inputStream2.partitionByHash(keyPosition2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream. Partitions the elements of
-	 * input1 and input2 according to keyPositions1 and keyPositions2.
-	 *
-	 * @param keyPositions1
-	 *            The fields used to group the first input stream.
-	 * @param keyPositions2
-	 *            The fields used to group the second input stream.
-	 * @return The partitioned {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> partitionByHash(int[] keyPositions1, int[] keyPositions2) {
-		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPositions1),
-				inputStream2.partitionByHash(keyPositions2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream using key expressions. Partitions
-	 * the elements of input1 and input2 according to field1 and field2. A
-	 * field expression is either the name of a public field or a getter method
-	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
-	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-	 *
-	 * @param field1
-	 *            The partitioning expressions for the first input
-	 * @param field2
-	 *            The partitioning expressions for the second input
-	 * @return The partitioned {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> partitionByHash(String field1, String field2) {
-		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(field1),
-				inputStream2.partitionByHash(field2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream using key expressions. Partitions
-	 * the elements of input1 and input2 according to fields1 and fields2. A
-	 * field expression is either the name of a public field or a getter method
-	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
-	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-	 *
-	 * @param fields1
-	 *            The partitioning expressions for the first input
-	 * @param fields2
-	 *            The partitioning expressions for the second input
-	 * @return The partitioned {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> partitionByHash(String[] fields1, String[] fields2) {
-		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(fields1),
-				inputStream2.partitionByHash(fields2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream. Partitions the elements of
-	 * input1 and input2 using keySelector1 and keySelector2.
-	 *
-	 * @param keySelector1
-	 *            The {@link KeySelector} used for partitioning the first input
-	 * @param keySelector2
-	 *            The {@link KeySelector} used for partitioning the second input
-	 * @return @return The partitioned {@link ConnectedStreams}
-	 */
-	public ConnectedStreams<IN1, IN2> partitionByHash(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
-		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keySelector1),
-				inputStream2.partitionByHash(keySelector2));
-	}
-
-	/**
-	 * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
-	 * the output to a common type. The transformation calls a
-	 * {@link CoMapFunction#map1} for each element of the first input and
-	 * {@link CoMapFunction#map2} for each element of the second input. Each
-	 * CoMapFunction call returns exactly one element.
-	 * 
-	 * @param coMapper The CoMapFunction used to jointly transform the two input DataStreams
-	 * @return The transformed {@link DataStream}
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
-
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
-				CoMapFunction.class, false, true, getType1(), getType2(),
-				Utils.getCallLocationName(), true);
-
-		return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));
-
-	}
-
-	/**
-	 * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
-	 * maps the output to a common type. The transformation calls a
-	 * {@link CoFlatMapFunction#flatMap1} for each element of the first input
-	 * and {@link CoFlatMapFunction#flatMap2} for each element of the second
-	 * input. Each CoFlatMapFunction call returns any number of elements
-	 * including none.
-	 * 
-	 * @param coFlatMapper
-	 *            The CoFlatMapFunction used to jointly transform the two input
-	 *            DataStreams
-	 * @return The transformed {@link DataStream}
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
-			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
-
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
-				CoFlatMapFunction.class, false, true, getType1(), getType2(),
-				Utils.getCallLocationName(), true);
-
-		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
-	}
-
-	public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
-			TypeInformation<OUT> outTypeInfo,
-			TwoInputStreamOperator<IN1, IN2, OUT> operator) {
-
-		// read the output type of the input Transforms to coax out errors about MissingTypeInfo
-		inputStream1.getType();
-		inputStream2.getType();
-
-		TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(
-				inputStream1.getTransformation(),
-				inputStream2.getTransformation(),
-				functionName,
-				operator,
-				outTypeInfo,
-				environment.getParallelism());
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(environment, transform);
-
-		getExecutionEnvironment().addOperator(transform);
-
-		return returnStream;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
deleted file mode 100644
index 176a07f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ /dev/null
@@ -1,1077 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.io.TextOutputFormat;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamFilter;
-import org.apache.flink.streaming.api.operators.StreamFlatMap;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.api.transformations.UnionTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A DataStream represents a stream of elements of the same type. A DataStream
- * can be transformed into another DataStream by applying a transformation as
- * for example:
- * <ul>
- * <li>{@link DataStream#map},
- * <li>{@link DataStream#filter}, or
- * </ul>
- * 
- * @param <T> The type of the elements in this Stream
- */
-public class DataStream<T> {
-
-	protected final StreamExecutionEnvironment environment;
-
-	protected final StreamTransformation<T> transformation;
-
-	/**
-	 * Create a new {@link DataStream} in the given execution environment with
-	 * partitioning set to forward by default.
-	 *
-	 * @param environment The StreamExecutionEnvironment
-	 */
-	public DataStream(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
-		this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
-		this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
-	}
-
-	/**
-	 * Returns the ID of the {@link DataStream} in the current {@link StreamExecutionEnvironment}.
-	 * 
-	 * @return ID of the DataStream
-	 */
-	public Integer getId() {
-		return transformation.getId();
-	}
-
-	/**
-	 * Gets the parallelism for this operator.
-	 * 
-	 * @return The parallelism set for this operator.
-	 */
-	public int getParallelism() {
-		return transformation.getParallelism();
-	}
-
-	/**
-	 * Gets the type of the stream.
-	 * 
-	 * @return The type of the datastream.
-	 */
-	public TypeInformation<T> getType() {
-		return transformation.getOutputType();
-	}
-
-	/**
-	 * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
-	 * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
-	 *
-	 * @return The cleaned Function
-	 */
-	protected <F> F clean(F f) {
-		return getExecutionEnvironment().clean(f);
-	}
-
-	/**
-	 * Returns the {@link StreamExecutionEnvironment} that was used to create this
-	 * {@link DataStream}
-	 *
-	 * @return The Execution Environment
-	 */
-	public StreamExecutionEnvironment getExecutionEnvironment() {
-		return environment;
-	}
-
-	public ExecutionConfig getExecutionConfig() {
-		return environment.getConfig();
-	}
-
-	/**
-	 * Creates a new {@link DataStream} by merging {@link DataStream} outputs of
-	 * the same type with each other. The DataStreams merged using this operator
-	 * will be transformed simultaneously.
-	 * 
-	 * @param streams
-	 *            The DataStreams to union output with.
-	 * @return The {@link DataStream}.
-	 */
-	@SafeVarargs
-	public final DataStream<T> union(DataStream<T>... streams) {
-		List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
-		unionedTransforms.add(this.transformation);
-
-		Collection<StreamTransformation<?>> thisPredecessors = this.getTransformation().getTransitivePredecessors();
-
-		for (DataStream<T> newStream : streams) {
-			if (!(newStream.getParallelism() == this.getParallelism())) {
-				throw new UnsupportedClassVersionError(
-						"DataStream can only be unioned with DataStreams of the same parallelism. " +
-								"This Stream: " + this.getTransformation() +
-								", other stream: " + newStream.getTransformation());
-			}
-			if (!getType().equals(newStream.getType())) {
-				throw new IllegalArgumentException("Cannot union streams of different types: "
-						+ getType() + " and " + newStream.getType());
-			}
-			
-			Collection<StreamTransformation<?>> predecessors = newStream.getTransformation().getTransitivePredecessors();
-
-			if (predecessors.contains(this.transformation) || thisPredecessors.contains(newStream.getTransformation())) {
-				throw new UnsupportedOperationException("A DataStream cannot be unioned with itself");
-			}
-			unionedTransforms.add(newStream.getTransformation());
-		}
-		return new DataStream<T>(this.environment, new UnionTransformation<T>(unionedTransforms));
-	}
-
-	/**
-	 * Operator used for directing tuples to specific named outputs using an
-	 * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
-	 * Calling this method on an operator creates a new {@link SplitStream}.
-	 * 
-	 * @param outputSelector
-	 *            The user defined
-	 *            {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
-	 *            for directing the tuples.
-	 * @return The {@link SplitStream}
-	 */
-	public SplitStream<T> split(OutputSelector<T> outputSelector) {
-		return new SplitStream<T>(this, clean(outputSelector));
-	}
-
-	/**
-	 * Creates a new {@link ConnectedStreams} by connecting
-	 * {@link DataStream} outputs of (possible) different types with each other.
-	 * The DataStreams connected using this operator can be used with
-	 * CoFunctions to apply joint transformations.
-	 * 
-	 * @param dataStream
-	 *            The DataStream with which this stream will be connected.
-	 * @return The {@link ConnectedStreams}.
-	 */
-	public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
-		return new ConnectedStreams<T, R>(environment, this, dataStream);
-	}
-
-	/**
-	 * 
-	 * It creates a new {@link KeyedStream} that uses the provided key for partitioning
-	 * its operator states. 
-	 *
-	 * @param key
-	 *            The KeySelector to be used for extracting the key for partitioning
-	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
-	 */
-	public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
-		return new KeyedStream<T, K>(this, clean(key));
-	}
-
-	/**
-	 * Partitions the operator state of a {@link DataStream} by the given key positions. 
-	 *
-	 * @param fields
-	 *            The position of the fields on which the {@link DataStream}
-	 *            will be grouped.
-	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
-	 */
-	public KeyedStream<T, Tuple> keyBy(int... fields) {
-		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
-		} else {
-			return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
-		}
-	}
-
-	/**
-	 * Partitions the operator state of a {@link DataStream}using field expressions. 
-	 * A field expression is either the name of a public field or a getter method with parentheses
-	 * of the {@link DataStream}S underlying type. A dot can be used to drill
-	 * down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param fields
-	 *            One or more field expressions on which the state of the {@link DataStream} operators will be
-	 *            partitioned.
-	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
-	 **/
-	public KeyedStream<T, Tuple> keyBy(String... fields) {
-		return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
-	}
-
-	private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
-		return new KeyedStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
-				getType(), getExecutionConfig())));
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output is
-	 * partitioned hashing on the given fields. This setting only
-	 * effects the how the outputs will be distributed between the parallel
-	 * instances of the next processing operator.
-	 *
-	 * @param fields The tuple fields that should be used for partitioning
-	 * @return The partitioned DataStream
-	 *
-	 */
-	public DataStream<T> partitionByHash(int... fields) {
-		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return partitionByHash(KeySelectorUtil.getSelectorForArray(fields, getType()));
-		} else {
-			return partitionByHash(new Keys.ExpressionKeys<T>(fields, getType()));
-		}
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output is
-	 * partitioned hashing on the given fields. This setting only
-	 * effects the how the outputs will be distributed between the parallel
-	 * instances of the next processing operator.
-	 *
-	 * @param fields The tuple fields that should be used for partitioning
-	 * @return The partitioned DataStream
-	 *
-	 */
-	public DataStream<T> partitionByHash(String... fields) {
-		return partitionByHash(new Keys.ExpressionKeys<T>(fields, getType()));
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output is
-	 * partitioned using the given {@link KeySelector}. This setting only
-	 * effects the how the outputs will be distributed between the parallel
-	 * instances of the next processing operator.
-	 *
-	 * @param keySelector The function that extracts the key from an element in the Stream
-	 * @return The partitioned DataStream
-	 */
-	public DataStream<T> partitionByHash(KeySelector<T, ?> keySelector) {
-		return setConnectionType(new HashPartitioner<T>(clean(keySelector)));
-	}
-
-	//private helper method for partitioning
-	private DataStream<T> partitionByHash(Keys<T> keys) {
-		KeySelector<T, ?> keySelector = clean(KeySelectorUtil.getSelectorForKeys(
-				keys,
-				getType(),
-				getExecutionConfig()));
-
-		return setConnectionType(new HashPartitioner<T>(keySelector));
-	}
-
-	/**
-	 * Partitions a tuple DataStream on the specified key fields using a custom partitioner.
-	 * This method takes the key position to partition on, and a partitioner that accepts the key type.
-	 * <p>
-	 * Note: This method works only on single field keys.
-	 *
-	 * @param partitioner The partitioner to assign partitions to keys.
-	 * @param field The field index on which the DataStream is to partitioned.
-	 * @return The partitioned DataStream.
-	 */
-	public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, int field) {
-		Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<T>(new int[]{field}, getType());
-		return partitionCustom(partitioner, outExpressionKeys);
-	}
-
-	/**
-	 * Partitions a POJO DataStream on the specified key fields using a custom partitioner.
-	 * This method takes the key expression to partition on, and a partitioner that accepts the key type.
-	 * <p>
-	 * Note: This method works only on single field keys.
-	 *
-	 * @param partitioner The partitioner to assign partitions to keys.
-	 * @param field The field index on which the DataStream is to partitioned.
-	 * @return The partitioned DataStream.
-	 */
-	public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) {
-		Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<T>(new String[]{field}, getType());
-		return partitionCustom(partitioner, outExpressionKeys);
-	}
-
-
-	/**
-	 * Partitions a DataStream on the key returned by the selector, using a custom partitioner.
-	 * This method takes the key selector to get the key to partition on, and a partitioner that
-	 * accepts the key type.
-	 * <p>
-	 * Note: This method works only on single field keys, i.e. the selector cannot return tuples
-	 * of fields.
-	 *
-	 * @param partitioner
-	 * 		The partitioner to assign partitions to keys.
-	 * @param keySelector
-	 * 		The KeySelector with which the DataStream is partitioned.
-	 * @return The partitioned DataStream.
-	 * @see KeySelector
-	 */
-	public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
-		return setConnectionType(new CustomPartitionerWrapper<K, T>(clean(partitioner),
-				clean(keySelector)));
-	}
-
-	//	private helper method for custom partitioning
-	private <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, Keys<T> keys) {
-		KeySelector<T, K> keySelector = KeySelectorUtil.getSelectorForOneKey(keys, partitioner, getType(), getExecutionConfig());
-
-		return setConnectionType(
-				new CustomPartitionerWrapper<K, T>(
-						clean(partitioner),
-						clean(keySelector)));
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are broadcasted to every parallel instance of the next component.
-	 *
-	 * <p>
-	 * This setting only effects the how the outputs will be distributed between
-	 * the parallel instances of the next processing operator.
-	 * 
-	 * @return The DataStream with broadcast partitioning set.
-	 */
-	public DataStream<T> broadcast() {
-		return setConnectionType(new BroadcastPartitioner<T>());
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are shuffled uniformly randomly to the next component.
-	 *
-	 * <p>
-	 * This setting only effects the how the outputs will be distributed between
-	 * the parallel instances of the next processing operator.
-	 * 
-	 * @return The DataStream with shuffle partitioning set.
-	 */
-	public DataStream<T> shuffle() {
-		return setConnectionType(new ShufflePartitioner<T>());
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are forwarded to the local subtask of the next component (whenever
-	 * possible).
-	 *
-	 * <p>
-	 * This setting only effects the how the outputs will be distributed between
-	 * the parallel instances of the next processing operator.
-	 * 
-	 * @return The DataStream with forward partitioning set.
-	 */
-	public DataStream<T> forward() {
-		return setConnectionType(new ForwardPartitioner<T>());
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are distributed evenly to instances of the next component in a Round-robin
-	 * fashion.
-	 *
-	 * <p>
-	 * This setting only effects the how the outputs will be distributed between
-	 * the parallel instances of the next processing operator.
-	 * 
-	 * @return The DataStream with rebalance partitioning set.
-	 */
-	public DataStream<T> rebalance() {
-		return setConnectionType(new RebalancePartitioner<T>());
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output values
-	 * all go to the first instance of the next processing operator. Use this
-	 * setting with care since it might cause a serious performance bottleneck
-	 * in the application.
-	 * 
-	 * @return The DataStream with shuffle partitioning set.
-	 */
-	public DataStream<T> global() {
-		return setConnectionType(new GlobalPartitioner<T>());
-	}
-
-	/**
-	 * Initiates an iterative part of the program that feeds back data streams.
-	 * The iterative part needs to be closed by calling
-	 * {@link IterativeStream#closeWith(DataStream)}. The transformation of
-	 * this IterativeStream will be the iteration head. The data stream
-	 * given to the {@link IterativeStream#closeWith(DataStream)} method is
-	 * the data stream that will be fed back and used as the input for the
-	 * iteration head. The user can also use different feedback type than the
-	 * input of the iteration and treat the input and feedback streams as a
-	 * {@link ConnectedStreams} be calling
-	 * {@link IterativeStream#withFeedbackType(TypeInformation)}
-	 * <p>
-	 * A common usage pattern for streaming iterations is to use output
-	 * splitting to send a part of the closing data stream to the head. Refer to
-	 * {@link #split(OutputSelector)} for more information.
-	 * <p>
-	 * The iteration edge will be partitioned the same way as the first input of
-	 * the iteration head unless it is changed in the
-	 * {@link IterativeStream#closeWith(DataStream)} call.
-	 * <p>
-	 * By default a DataStream with iteration will never terminate, but the user
-	 * can use the maxWaitTime parameter to set a max waiting time for the
-	 * iteration head. If no data received in the set time, the stream
-	 * terminates.
-	 * 
-	 * @return The iterative data stream created.
-	 */
-	public IterativeStream<T> iterate() {
-		return new IterativeStream<T>(this, 0);
-	}
-
-	/**
-	 * Initiates an iterative part of the program that feeds back data streams.
-	 * The iterative part needs to be closed by calling
-	 * {@link IterativeStream#closeWith(DataStream)}. The transformation of
-	 * this IterativeStream will be the iteration head. The data stream
-	 * given to the {@link IterativeStream#closeWith(DataStream)} method is
-	 * the data stream that will be fed back and used as the input for the
-	 * iteration head. The user can also use different feedback type than the
-	 * input of the iteration and treat the input and feedback streams as a
-	 * {@link ConnectedStreams} be calling
-	 * {@link IterativeStream#withFeedbackType(TypeInformation)}
-	 * <p>
-	 * A common usage pattern for streaming iterations is to use output
-	 * splitting to send a part of the closing data stream to the head. Refer to
-	 * {@link #split(OutputSelector)} for more information.
-	 * <p>
-	 * The iteration edge will be partitioned the same way as the first input of
-	 * the iteration head unless it is changed in the
-	 * {@link IterativeStream#closeWith(DataStream)} call.
-	 * <p>
-	 * By default a DataStream with iteration will never terminate, but the user
-	 * can use the maxWaitTime parameter to set a max waiting time for the
-	 * iteration head. If no data received in the set time, the stream
-	 * terminates.
-	 * 
-	 * @param maxWaitTimeMillis
-	 *            Number of milliseconds to wait between inputs before shutting
-	 *            down
-	 * 
-	 * @return The iterative data stream created.
-	 */
-	public IterativeStream<T> iterate(long maxWaitTimeMillis) {
-		return new IterativeStream<T>(this, maxWaitTimeMillis);
-	}
-
-	/**
-	 * Applies a Map transformation on a {@link DataStream}. The transformation
-	 * calls a {@link MapFunction} for each element of the DataStream. Each
-	 * MapFunction call returns exactly one element. The user can also extend
-	 * {@link RichMapFunction} to gain access to other features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * 
-	 * @param mapper
-	 *            The MapFunction that is called for each element of the
-	 *            DataStream.
-	 * @param <R>
-	 *            output type
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<T, R> mapper) {
-
-		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
-				Utils.getCallLocationName(), true);
-
-		return transform("Map", outType, new StreamMap<T, R>(clean(mapper)));
-	}
-
-	/**
-	 * Applies a FlatMap transformation on a {@link DataStream}. The
-	 * transformation calls a {@link FlatMapFunction} for each element of the
-	 * DataStream. Each FlatMapFunction call can return any number of elements
-	 * including none. The user can also extend {@link RichFlatMapFunction} to
-	 * gain access to other features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * 
-	 * @param flatMapper
-	 *            The FlatMapFunction that is called for each element of the
-	 *            DataStream
-	 * 
-	 * @param <R>
-	 *            output type
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<T, R> flatMapper) {
-
-		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
-				getType(), Utils.getCallLocationName(), true);
-
-		return transform("Flat Map", outType, new StreamFlatMap<T, R>(clean(flatMapper)));
-
-	}
-
-	/**
-	 * Applies a Filter transformation on a {@link DataStream}. The
-	 * transformation calls a {@link FilterFunction} for each element of the
-	 * DataStream and retains only those element for which the function returns
-	 * true. Elements for which the function returns false are filtered. The
-	 * user can also extend {@link RichFilterFunction} to gain access to other
-	 * features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * 
-	 * @param filter
-	 *            The FilterFunction that is called for each element of the
-	 *            DataStream.
-	 * @return The filtered DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> filter(FilterFunction<T> filter) {
-		return transform("Filter", getType(), new StreamFilter<T>(clean(filter)));
-
-	}
-
-	/**
-	 * Initiates a Project transformation on a {@link Tuple} {@link DataStream}.<br/>
-	 * <b>Note: Only Tuple DataStreams can be projected.</b>
-	 *
-	 * <p>
-	 * The transformation projects each Tuple of the DataSet onto a (sub)set of
-	 * fields.
-	 * 
-	 * @param fieldIndexes
-	 *            The field indexes of the input tuples that are retained. The
-	 *            order of fields in the output tuple corresponds to the order
-	 *            of field indexes.
-	 * @return The projected DataStream
-	 * 
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <R extends Tuple> SingleOutputStreamOperator<R, ?> project(int... fieldIndexes) {
-		return new StreamProjection<T>(this, fieldIndexes).projectTupleX();
-	}
-
-	/**
-	 * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys
-	 * and window can be specified.
-	 */
-	public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
-		return new CoGroupedStreams<>(this, otherStream);
-	}
-
-	/**
-	 * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
-	 * and window can be specified.
-	 */
-	public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
-		return new JoinedStreams<>(this, otherStream);
-	}
-
-	/**
-	 * Windows this {@code DataStream} into tumbling time windows.
-	 *
-	 * <p>
-	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
-	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
-	 * set using
-	 *
-	 * <p>
-	 * Note: This operation can be inherently non-parallel since all elements have to pass through
-	 * the same operator instance. (Only for special cases, such as aligned time windows is
-	 * it possible to perform this operation in parallel).
-	 *
-	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
-	 *
-	 * @param size The size of the window.
-	 */
-	public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
-		return windowAll(TumblingTimeWindows.of(size));
-	}
-
-	/**
-	 * Windows this {@code DataStream} into sliding time windows.
-	 *
-	 * <p>
-	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
-	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
-	 * set using
-	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
-	 *
-	 * <p>
-	 * Note: This operation can be inherently non-parallel since all elements have to pass through
-	 * the same operator instance. (Only for special cases, such as aligned time windows is
-	 * it possible to perform this operation in parallel).
-	 *
-	 * @param size The size of the window.
-	 */
-	public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
-		return windowAll(SlidingTimeWindows.of(size, slide));
-	}
-
-	/**
-	 * Windows this {@code DataStream} into tumbling count windows.
-	 *
-	 * <p>
-	 * Note: This operation can be inherently non-parallel since all elements have to pass through
-	 * the same operator instance. (Only for special cases, such as aligned time windows is
-	 * it possible to perform this operation in parallel).
-	 *
-	 * @param size The size of the windows in number of elements.
-	 */
-	public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
-		return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
-	}
-
-	/**
-	 * Windows this {@code DataStream} into sliding count windows.
-	 *
-	 * <p>
-	 * Note: This operation can be inherently non-parallel since all elements have to pass through
-	 * the same operator instance. (Only for special cases, such as aligned time windows is
-	 * it possible to perform this operation in parallel).
-	 *
-	 * @param size The size of the windows in number of elements.
-	 * @param slide The slide interval in number of elements.
-	 */
-	public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
-		return windowAll(GlobalWindows.create())
-				.evictor(CountEvictor.of(size))
-				.trigger(CountTrigger.of(slide));
-	}
-
-	/**
-	 * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
-	 * over a key grouped stream. Elements are put into windows by a
-	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of
-	 * elements is done both by key and by window.
-	 *
-	 * <p>
-	 * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
-	 * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
-	 * that is used if a {@code Trigger} is not specified.
-	 *
-	 * <p>
-	 * Note: This operation can be inherently non-parallel since all elements have to pass through
-	 * the same operator instance. (Only for special cases, such as aligned time windows is
-	 * it possible to perform this operation in parallel).
-	 *
-	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
-	 * @return The trigger windows data stream.
-	 */
-	public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
-		return new AllWindowedStream<>(this, assigner);
-	}
-
-	/**
-	 * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
-	 * The internal timestamps are, for example, used to to event-time window operations.
-	 *
-	 * <p>
-	 * If you know that the timestamps are strictly increasing you can use an
-	 * {@link org.apache.flink.streaming.api.functions.AscendingTimestampExtractor}. Otherwise,
-	 * you should provide a {@link TimestampExtractor} that also implements
-	 * {@link TimestampExtractor#getCurrentWatermark()} to keep track of watermarks.
-	 *
-	 * @see org.apache.flink.streaming.api.watermark.Watermark
-	 *
-	 * @param extractor The TimestampExtractor that is called for each element of the DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> assignTimestamps(TimestampExtractor<T> extractor) {
-		// match parallelism to input, otherwise dop=1 sources could lead to some strange
-		// behaviour: the watermark will creep along very slowly because the elements
-		// from the source go to each extraction operator round robin.
-		int inputParallelism = getTransformation().getParallelism();
-		ExtractTimestampsOperator<T> operator = new ExtractTimestampsOperator<>(clean(extractor));
-		return transform("ExtractTimestamps", getTransformation().getOutputType(), operator)
-				.setParallelism(inputParallelism);
-	}
-
-	/**
-	 * Writes a DataStream to the standard output stream (stdout).
-	 *
-	 * <p>
-	 * For each element of the DataStream the result of
-	 * {@link Object#toString()} is written.
-	 * 
-	 * @return The closed DataStream.
-	 */
-	public DataStreamSink<T> print() {
-		PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>();
-		return addSink(printFunction);
-	}
-
-	/**
-	 * Writes a DataStream to the standard output stream (stderr).
-	 *
-	 * <p>
-	 * For each element of the DataStream the result of
-	 * {@link Object#toString()} is written.
-	 * 
-	 * @return The closed DataStream.
-	 */
-	public DataStreamSink<T> printToErr() {
-		PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>(true);
-		return addSink(printFunction);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format.
-	 *
-	 * <p>
-	 * For every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
-	 * 
-	 * @param path
-	 *            the path pointing to the location the text file is written to
-	 * 
-	 * @return the closed DataStream.
-	 */
-	public DataStreamSink<T> writeAsText(String path) {
-		return write(new TextOutputFormat<T>(new Path(path)), 0L);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically, in every millis milliseconds.
-	 *
-	 * <p>
-	 * For every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
-	 * 
-	 * @param path
-	 *            the path pointing to the location the text file is written to
-	 * @param millis
-	 *            the file update frequency
-	 * 
-	 * @return the closed DataStream
-	 */
-	public DataStreamSink<T> writeAsText(String path, long millis) {
-		TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
-		return write(tof, millis);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format.
-	 *
-	 * <p>
-	 * For every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
-	 * 
-	 * @param path
-	 *            the path pointing to the location the text file is written to
-	 * @param writeMode
-	 *            Control the behavior for existing files. Options are
-	 *            NO_OVERWRITE and OVERWRITE.
-	 * 
-	 * @return the closed DataStream.
-	 */
-	public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
-		TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
-		tof.setWriteMode(writeMode);
-		return write(tof, 0L);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format.
-	 *
-	 * <p>
-	 * For every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
-	 * 
-	 * @param path
-	 *            the path pointing to the location the text file is written to
-	 * @param writeMode
-	 *            Controls the behavior for existing files. Options are
-	 *            NO_OVERWRITE and OVERWRITE.
-	 * @param millis
-	 *            the file update frequency
-	 * 
-	 * @return the closed DataStream.
-	 */
-	public DataStreamSink<T> writeAsText(String path, WriteMode writeMode, long millis) {
-		TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
-		tof.setWriteMode(writeMode);
-		return write(tof, millis);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in csv format.
-	 *
-	 * <p>
-	 * For every field of an element of the DataStream the result of {@link Object#toString()}
-	 * is written. This method can only be used on data streams of tuples.
-	 * 
-	 * @param path
-	 *            the path pointing to the location the text file is written to
-	 * 
-	 * @return the closed DataStream
-	 */
-	@SuppressWarnings("unchecked")
-	public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path) {
-		Preconditions.checkArgument(getType().isTupleType(),
-				"The writeAsCsv() method can only be used on data sets of tuples.");
-		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
-				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-		return write((OutputFormat<T>) of, 0L);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in csv format. The
-	 * writing is performed periodically, in every millis milliseconds.
-	 *
-	 * <p>
-	 * For every field of an element of the DataStream the result of {@link Object#toString()}
-	 * is written. This method can only be used on data streams of tuples.
-	 *
-	 * @param path
-	 *            the path pointing to the location the text file is written to
-	 * @param millis
-	 *            the file update frequency
-	 * 
-	 * @return the closed DataStream
-	 */
-	@SuppressWarnings("unchecked")
-	public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, long millis) {
-		Preconditions.checkArgument(getType().isTupleType(),
-				"The writeAsCsv() method can only be used on data sets of tuples.");
-		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
-				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-		return write((OutputFormat<T>) of, millis);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in csv format.
-	 *
-	 * <p>
-	 * For every field of an element of the DataStream the result of {@link Object#toString()}
-	 * is written. This method can only be used on data streams of tuples.
-	 * 
-	 * @param path
-	 *            the path pointing to the location the text file is written to
-	 * @param writeMode
-	 *            Controls the behavior for existing files. Options are
-	 *            NO_OVERWRITE and OVERWRITE.
-	 * 
-	 * @return the closed DataStream
-	 */
-	@SuppressWarnings("unchecked")
-	public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode) {
-		Preconditions.checkArgument(getType().isTupleType(),
-				"The writeAsCsv() method can only be used on data sets of tuples.");
-		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
-				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-		if (writeMode != null) {
-			of.setWriteMode(writeMode);
-		}
-		return write((OutputFormat<T>) of, 0L);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in csv format. The
-	 * writing is performed periodically, in every millis milliseconds.
-	 *
-	 * <p>
-	 * For every field of an element of the DataStream the result of {@link Object#toString()}
-	 * is written. This method can only be used on data streams of tuples.
-	 * 
-	 * @param path
-	 *            the path pointing to the location the text file is written to
-	 * @param writeMode
-	 *            Controls the behavior for existing files. Options are
-	 *            NO_OVERWRITE and OVERWRITE.
-	 * @param millis
-	 *            the file update frequency
-	 * 
-	 * @return the closed DataStream
-	 */
-	@SuppressWarnings("unchecked")
-	public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode,
-			long millis) {
-		Preconditions.checkArgument(getType().isTupleType(),
-				"The writeAsCsv() method can only be used on data sets of tuples.");
-		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
-				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-		if (writeMode != null) {
-			of.setWriteMode(writeMode);
-		}
-		return write((OutputFormat<T>) of, millis);
-	}
-
-	/**
-	 * Writes the DataStream to a socket as a byte array. The format of the
-	 * output is specified by a {@link SerializationSchema}.
-	 * 
-	 * @param hostName
-	 *            host of the socket
-	 * @param port
-	 *            port of the socket
-	 * @param schema
-	 *            schema for serialization
-	 * @return the closed DataStream
-	 */
-	public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema) {
-		DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema, 0));
-		returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
-		return returnStream;
-	}
-	
-	/**
-	 * Writes the dataStream into an output, described by an OutputFormat.
-	 * 
-	 * @param format The output format
-	 * @param millis the write frequency
-	 * @return The closed DataStream
-	 */
-	public DataStreamSink<T> write(OutputFormat<T> format, long millis) {
-		return addSink(new FileSinkFunctionByMillis<T>(format, millis));
-	}
-
-	/**
-	 * Method for passing user defined operators along with the type
-	 * information that will transform the DataStream.
-	 * 
-	 * @param operatorName
-	 *            name of the operator, for logging purposes
-	 * @param outTypeInfo
-	 *            the output type of the operator
-	 * @param operator
-	 *            the object containing the transformation logic
-	 * @param <R>
-	 *            type of the return stream
-	 * @return the data stream constructed
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
-
-		// read the output type of the input Transform to coax out errors about MissingTypeInfo
-		transformation.getOutputType();
-
-		OneInputTransformation<T, R> resultTransform = new OneInputTransformation<T, R>(
-				this.transformation,
-				operatorName,
-				operator,
-				outTypeInfo,
-				environment.getParallelism());
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
-
-		getExecutionEnvironment().addOperator(resultTransform);
-
-		return returnStream;
-	}
-
-	/**
-	 * Internal function for setting the partitioner for the DataStream
-	 *
-	 * @param partitioner
-	 *            Partitioner to set.
-	 * @return The modified DataStream.
-	 */
-	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
-		return new DataStream<T>(this.getExecutionEnvironment(), new PartitionTransformation<T>(this.getTransformation(), partitioner));
-	}
-
-	/**
-	 * Adds the given sink to this DataStream. Only streams with sinks added
-	 * will be executed once the {@link StreamExecutionEnvironment#execute()}
-	 * method is called.
-	 * 
-	 * @param sinkFunction
-	 *            The object containing the sink's invoke function.
-	 * @return The closed DataStream.
-	 */
-	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
-
-		// read the output type of the input Transform to coax out errors about MissingTypeInfo
-		transformation.getOutputType();
-
-		// configure the type if needed
-		if (sinkFunction instanceof InputTypeConfigurable) {
-			((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig() );
-		}
-
-		StreamSink<T> sinkOperator = new StreamSink<T>(clean(sinkFunction));
-
-		DataStreamSink<T> sink = new DataStreamSink<T>(this, sinkOperator);
-
-		getExecutionEnvironment().addOperator(sink.getTransformation());
-		return sink;
-	}
-
-	/**
-	 * Returns the {@link StreamTransformation} that represents the operation that logically creates
-	 * this {@link DataStream}.
-	 *
-	 * @return The Transformation
-	 */
-	public StreamTransformation<T> getTransformation() {
-		return transformation;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
deleted file mode 100644
index 24104ad..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.api.transformations.SinkTransformation;
-
-/**
- * A Stream Sink. This is used for emitting elements from a streaming topology.
- *
- * @param <T> The type of the elements in the Stream
- */
-public class DataStreamSink<T> {
-
-	SinkTransformation<T> transformation;
-
-	@SuppressWarnings("unchecked")
-	protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
-		this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
-	}
-
-	/**
-	 * Returns the transformation that contains the actual sink operator of this sink.
-	 */
-	public SinkTransformation<T> getTransformation() {
-		return transformation;
-	}
-
-	/**
-	 * Sets the name of this sink. This name is
-	 * used by the visualization and logging during runtime.
-	 *
-	 * @return The named sink.
-	 */
-	public DataStreamSink<T> name(String name) {
-		transformation.setName(name);
-		return this;
-	}
-
-	/**
-	 * Sets the parallelism for this sink. The degree must be higher than zero.
-	 *
-	 * @param parallelism The parallelism for this sink.
-	 * @return The sink with set parallelism.
-	 */
-	public DataStreamSink<T> setParallelism(int parallelism) {
-		transformation.setParallelism(parallelism);
-		return this;
-	}
-
-	/**
-	 * Turns off chaining for this operator so thread co-location will not be
-	 * used as an optimization.
-	 *
-	 * <p>
-	 * Chaining can be turned off for the whole
-	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
-	 * however it is not advised for performance considerations.
-	 *
-	 * @return The sink with chaining disabled
-	 */
-	public DataStreamSink<T> disableChaining() {
-		this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
deleted file mode 100644
index d2e04a7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-
-/**
- * The DataStreamSource represents the starting point of a DataStream.
- * 
- * @param <T> Type of the elements in the DataStream created from the this source.
- */
-public class DataStreamSource<T> extends SingleOutputStreamOperator<T, DataStreamSource<T>> {
-
-	boolean isParallel;
-
-	public DataStreamSource(StreamExecutionEnvironment environment,
-			TypeInformation<T> outTypeInfo, StreamSource<T> operator,
-			boolean isParallel, String sourceName) {
-		super(environment, new SourceTransformation<T>(sourceName, operator, outTypeInfo, environment.getParallelism()));
-
-		this.isParallel = isParallel;
-		if (!isParallel) {
-			setParallelism(1);
-		}
-	}
-
-	@Override
-	public DataStreamSource<T> setParallelism(int parallelism) {
-		if (parallelism > 1 && !isParallel) {
-			throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
-		} else {
-			return (DataStreamSource<T>) super.setParallelism(parallelism);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
deleted file mode 100644
index 346bef9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-
-import java.util.Collection;
-
-/**
- * The iterative data stream represents the start of an iteration in a {@link DataStream}.
- * 
- * @param <T> Type of the elements in this Stream
- */
-public class IterativeStream<T> extends SingleOutputStreamOperator<T, IterativeStream<T>> {
-
-	// We store these so that we can create a co-iteration if we need to
-	private DataStream<T> originalInput;
-	private long maxWaitTime;
-	
-	protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
-		super(dataStream.getExecutionEnvironment(),
-				new FeedbackTransformation<T>(dataStream.getTransformation(), maxWaitTime));
-		this.originalInput = dataStream;
-		this.maxWaitTime = maxWaitTime;
-		setBufferTimeout(dataStream.environment.getBufferTimeout());
-	}
-
-	/**
-	 * Closes the iteration. This method defines the end of the iterative
-	 * program part that will be fed back to the start of the iteration.
-	 *
-	 * <p>
-	 * A common usage pattern for streaming iterations is to use output
-	 * splitting to send a part of the closing data stream to the head. Refer to
-	 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
-	 * for more information.
-	 * 
-	 * @param feedbackStream
-	 *            {@link DataStream} that will be used as input to the iteration
-	 *            head.
-	 *
-	 * @return The feedback stream.
-	 * 
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public DataStream<T> closeWith(DataStream<T> feedbackStream) {
-
-		Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
-
-		if (!predecessors.contains(this.transformation)) {
-			throw new UnsupportedOperationException(
-					"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
-		}
-
-		((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
-
-		return feedbackStream;
-	}
-
-	/**
-	 * Changes the feedback type of the iteration and allows the user to apply
-	 * co-transformations on the input and feedback stream, as in a
-	 * {@link ConnectedStreams}.
-	 *
-	 * <p>
-	 * For type safety the user needs to define the feedback type
-	 * 
-	 * @param feedbackTypeString
-	 *            String describing the type information of the feedback stream.
-	 * @return A {@link ConnectedIterativeStreams}.
-	 */
-	public <F> ConnectedIterativeStreams<T, F> withFeedbackType(String feedbackTypeString) {
-		return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
-	}
-
-	/**
-	 * Changes the feedback type of the iteration and allows the user to apply
-	 * co-transformations on the input and feedback stream, as in a
-	 * {@link ConnectedStreams}.
-	 *
-	 * <p>
-	 * For type safety the user needs to define the feedback type
-	 * 
-	 * @param feedbackTypeClass
-	 *            Class of the elements in the feedback stream.
-	 * @return A {@link ConnectedIterativeStreams}.
-	 */
-	public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
-		return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
-	}
-
-	/**
-	 * Changes the feedback type of the iteration and allows the user to apply
-	 * co-transformations on the input and feedback stream, as in a
-	 * {@link ConnectedStreams}.
-	 *
-	 * <p>
-	 * For type safety the user needs to define the feedback type
-	 * 
-	 * @param feedbackType
-	 *            The type information of the feedback stream.
-	 * @return A {@link ConnectedIterativeStreams}.
-	 */
-	public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
-		return new ConnectedIterativeStreams<T, F>(originalInput, feedbackType, maxWaitTime);
-	}
-	
-	/**
-	 * The {@link ConnectedIterativeStreams} represent a start of an
-	 * iterative part of a streaming program, where the original input of the
-	 * iteration and the feedback of the iteration are connected as in a
-	 * {@link ConnectedStreams}.
-	 *
-	 * <p>
-	 * The user can distinguish between the two inputs using co-transformation,
-	 * thus eliminating the need for mapping the inputs and outputs to a common
-	 * type.
-	 * 
-	 * @param <I>
-	 *            Type of the input of the iteration
-	 * @param <F>
-	 *            Type of the feedback of the iteration
-	 */
-	public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {
-
-		private CoFeedbackTransformation<F> coFeedbackTransformation;
-
-		public ConnectedIterativeStreams(DataStream<I> input,
-				TypeInformation<F> feedbackType,
-				long waitTime) {
-			super(input.getExecutionEnvironment(),
-					input,
-					new DataStream<F>(input.getExecutionEnvironment(),
-							new CoFeedbackTransformation<F>(input.getParallelism(),
-									feedbackType,
-									waitTime)));
-			this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
-		}
-
-		/**
-		 * Closes the iteration. This method defines the end of the iterative
-		 * program part that will be fed back to the start of the iteration as
-		 * the second input in the {@link ConnectedStreams}.
-		 * 
-		 * @param feedbackStream
-		 *            {@link DataStream} that will be used as second input to
-		 *            the iteration head.
-		 * @return The feedback stream.
-		 * 
-		 */
-		@SuppressWarnings({ "rawtypes", "unchecked" })
-		public DataStream<F> closeWith(DataStream<F> feedbackStream) {
-
-			Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
-
-			if (!predecessors.contains(this.coFeedbackTransformation)) {
-				throw new UnsupportedOperationException(
-						"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
-			}
-
-			coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());
-
-			return feedbackStream;
-		}
-		
-		private UnsupportedOperationException groupingException = new UnsupportedOperationException(
-				"Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
-		
-		@Override
-		public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
-
-		@Override
-		public ConnectedStreams<I, F> keyBy(String field1, String field2) {throw groupingException;}
-
-		@Override
-		public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {throw groupingException;}
-
-		@Override
-		public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
-
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
-		
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
-		
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
-		
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
-		
-		@Override
-		public ConnectedStreams<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
deleted file mode 100644
index cff9355..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- *{@code JoinedStreams} represents two {@link DataStream DataStreams} that have been joined.
- * A streaming join operation is evaluated over elements in a window.
- *
- * <p>
- * To finalize the join operation you also need to specify a {@link KeySelector} for
- * both the first and second input and a {@link WindowAssigner}.
- *
- * <p>
- * Note: Right now, the the join is being evaluated in memory so you need to ensure that the number
- * of elements per key does not get too high. Otherwise the JVM might crash.
- *
- * <p>
- * Example:
- *
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> one = ...;
- * DataStream<Tuple2<String, Integer>> twp = ...;
- *
- * DataStream<T> result = one.join(two)
- *     .where(new MyFirstKeySelector())
- *     .equalTo(new MyFirstKeySelector())
- *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
- *     .apply(new MyJoinFunction());
- * } </pre>
- */
-public class JoinedStreams<T1, T2> {
-
-	/** The first input stream */
-	private final DataStream<T1> input1;
-
-	/** The second input stream */
-	private final DataStream<T2> input2;
-
-	/**
-	 * Creates new JoinedStreams data streams, which are the first step towards building a streaming co-group.
-	 *
-	 * @param input1 The first data stream.
-	 * @param input2 The second data stream.
-	 */
-	public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
-		this.input1 = requireNonNull(input1);
-		this.input2 = requireNonNull(input2);
-	}
-
-	/**
-	 * Specifies a {@link KeySelector} for elements from the first input.
-	 */
-	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
-		TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
-		return new Where<>(input1.clean(keySelector), keyType);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * CoGrouped streams that have the key for one side defined.
-	 *
-	 * @param <KEY> The type of the key.
-	 */
-	public class Where<KEY> {
-
-		private final KeySelector<T1, KEY> keySelector1;
-		private final TypeInformation<KEY> keyType;
-
-		Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
-			this.keySelector1 = keySelector1;
-			this.keyType = keyType;
-		}
-
-		/**
-		 * Specifies a {@link KeySelector} for elements from the second input.
-		 */
-		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
-			TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
-			if (!otherKey.equals(this.keyType)) {
-				throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
-						"first key = " + this.keyType + " , second key = " + otherKey);
-			}
-
-			return new EqualTo(input2.clean(keySelector));
-		}
-
-		// --------------------------------------------------------------------
-
-		/**
-		 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
-		 */
-		public class EqualTo {
-
-			private final KeySelector<T2, KEY> keySelector2;
-
-			EqualTo(KeySelector<T2, KEY> keySelector2) {
-				this.keySelector2 = requireNonNull(keySelector2);
-			}
-
-			/**
-			 * Specifies the window on which the co-group operation works.
-			 */
-			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
-				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as
-	 * well as a {@link WindowAssigner}.
-	 *
-	 * @param <T1> Type of the elements from the first input
-	 * @param <T2> Type of the elements from the second input
-	 * @param <KEY> Type of the key. This must be the same for both inputs
-	 * @param <W> Type of {@link Window} on which the join operation works.
-	 */
-	public static class WithWindow<T1, T2, KEY, W extends Window> {
-		
-		private final DataStream<T1> input1;
-		private final DataStream<T2> input2;
-
-		private final KeySelector<T1, KEY> keySelector1;
-		private final KeySelector<T2, KEY> keySelector2;
-		private final TypeInformation<KEY> keyType;
-
-		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
-
-		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
-
-		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
-
-		protected WithWindow(DataStream<T1> input1,
-				DataStream<T2> input2,
-				KeySelector<T1, KEY> keySelector1,
-				KeySelector<T2, KEY> keySelector2,
-				TypeInformation<KEY> keyType,
-				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
-				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
-				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
-			
-			this.input1 = requireNonNull(input1);
-			this.input2 = requireNonNull(input2);
-
-			this.keySelector1 = requireNonNull(keySelector1);
-			this.keySelector2 = requireNonNull(keySelector2);
-			this.keyType = requireNonNull(keyType);
-			
-			this.windowAssigner = requireNonNull(windowAssigner);
-			
-			this.trigger = trigger;
-			this.evictor = evictor;
-		}
-
-		/**
-		 * Sets the {@code Trigger} that should be used to trigger window emission.
-		 */
-		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
-					windowAssigner, newTrigger, evictor);
-		}
-
-		/**
-		 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
-		 *
-		 * <p>
-		 * Note: When using an evictor window performance will degrade significantly, since
-		 * pre-aggregation of window results cannot be used.
-		 */
-		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
-					windowAssigner, trigger, newEvictor);
-		}
-
-		/**
-		 * Completes the join operation with the user function that is executed
-		 * for each combination of elements with the same key in a window.
-		 */
-		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
-			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
-					function,
-					JoinFunction.class,
-					true,
-					true,
-					input1.getType(),
-					input2.getType(),
-					"Join",
-					false);
-
-			return apply(function, resultType);
-		}
-
-		/**
-		 * Completes the join operation with the user function that is executed
-		 * for each combination of elements with the same key in a window.
-		 */
-		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
-			//clean the closure
-			function = input1.getExecutionEnvironment().clean(function);
-
-			return input1.coGroup(input2)
-					.where(keySelector1)
-					.equalTo(keySelector2)
-					.window(windowAssigner)
-					.trigger(trigger)
-					.evictor(evictor)
-					.apply(new FlatJoinCoGroupFunction<>(function), resultType);
-
-		}
-
-		/**
-		 * Completes the join operation with the user function that is executed
-		 * for each combination of elements with the same key in a window.
-		 */
-		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
-			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
-					function,
-					JoinFunction.class,
-					true,
-					true,
-					input1.getType(),
-					input2.getType(),
-					"Join",
-					false);
-
-			return apply(function, resultType);
-		}
-
-		/**
-		 * Completes the join operation with the user function that is executed
-		 * for each combination of elements with the same key in a window.
-		 */
-		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
-			//clean the closure
-			function = input1.getExecutionEnvironment().clean(function);
-
-			return input1.coGroup(input2)
-					.where(keySelector1)
-					.equalTo(keySelector2)
-					.window(windowAssigner)
-					.trigger(trigger)
-					.evictor(evictor)
-					.apply(new JoinCoGroupFunction<>(function), resultType);
-
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Implementation of the functions
-	// ------------------------------------------------------------------------
-
-	/**
-	 * CoGroup function that does a nested-loop join to get the join result.
-	 */
-	private static class JoinCoGroupFunction<T1, T2, T>
-			extends WrappingFunction<JoinFunction<T1, T2, T>>
-			implements CoGroupFunction<T1, T2, T> {
-		private static final long serialVersionUID = 1L;
-
-		public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
-			super(wrappedFunction);
-		}
-
-		@Override
-		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
-			for (T1 val1: first) {
-				for (T2 val2: second) {
-					out.collect(wrappedFunction.join(val1, val2));
-				}
-			}
-		}
-	}
-
-	/**
-	 * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
-	 */
-	private static class FlatJoinCoGroupFunction<T1, T2, T>
-			extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
-			implements CoGroupFunction<T1, T2, T> {
-		private static final long serialVersionUID = 1L;
-
-		public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
-			super(wrappedFunction);
-		}
-
-		@Override
-		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
-			for (T1 val1: first) {
-				for (T2 val2: second) {
-					wrappedFunction.join(val1, val2, out);
-				}
-			}
-		}
-	}
-
-}


Mime
View raw message