flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [04/36] flink git commit: [scala] [streaming] Finished scala StreamExecutionEnvrionment functionality + DataStream sinks + docs
Date Wed, 07 Jan 2015 14:12:43 GMT
[scala] [streaming] Finished scala StreamExecutionEnvrionment functionality + DataStream sinks + docs

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
	flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1b53b16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1b53b16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1b53b16

Branch: refs/heads/release-0.8
Commit: d1b53b16695231f54d912d847c11a406523c8d5b
Parents: 40efecf
Author: Gyula Fora <gyfora@apache.org>
Authored: Sat Dec 13 01:08:08 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Jan 5 17:51:45 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 337 ++++---------------
 .../environment/StreamExecutionEnvironment.java |  40 ++-
 .../api/function/sink/WriteFormatAsCsv.java     |   6 +-
 .../api/function/sink/WriteSinkFunction.java    |  15 +-
 .../sink/WriteSinkFunctionByBatches.java        |  49 ---
 .../sink/WriteSinkFunctionByMillis.java         |   7 +-
 .../flink/streaming/api/WriteAsCsvTest.java     | 176 ----------
 .../flink/streaming/api/WriteAsTextTest.java    | 177 ----------
 .../flink/api/scala/streaming/DataStream.scala  | 181 +++++++++-
 .../streaming/StreamExecutionEnvironment.scala  | 103 +++++-
 10 files changed, 389 insertions(+), 702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 1cf8d72..3a3cdc4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,9 +44,9 @@ import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.WriteFormat;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
-import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
 import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -275,7 +275,9 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output is
-	 * partitioned by the selected fields.
+	 * partitioned by the selected fields. This setting only effects the how the
+	 * outputs will be distributed between the parallel instances of the next
+	 * processing operator.
 	 * 
 	 * @param fields
 	 *            The fields to partition by.
@@ -289,7 +291,9 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output is
-	 * partitioned by the given field expressions.
+	 * partitioned by the given field expressions. This setting only effects the
+	 * how the outputs will be distributed between the parallel instances of the
+	 * next processing operator.
 	 * 
 	 * @param fields
 	 *            The fields expressions to partition by.
@@ -303,7 +307,9 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output is
-	 * partitioned using the given {@link KeySelector}.
+	 * partitioned using the given {@link KeySelector}. This setting only
+	 * effects the how the outputs will be distributed between the parallel
+	 * instances of the next processing operator.
 	 * 
 	 * @param keySelector
 	 * @return
@@ -314,7 +320,9 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are broadcasted to every parallel instance of the next component.
+	 * are broadcasted to every parallel instance of the next component. This
+	 * setting only effects the how the outputs will be distributed between the
+	 * parallel instances of the next processing operator.
 	 * 
 	 * @return The DataStream with broadcast partitioning set.
 	 */
@@ -324,7 +332,9 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are shuffled to the next component.
+	 * are shuffled to the next component. This setting only effects the how the
+	 * outputs will be distributed between the parallel instances of the next
+	 * processing operator.
 	 * 
 	 * @return The DataStream with shuffle partitioning set.
 	 */
@@ -334,8 +344,10 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are forwarded to the local subtask of the next component. This is the
-	 * default partitioner setting.
+	 * are forwarded to the local subtask of the next component (whenever
+	 * possible). This is the default partitioner setting. This setting only
+	 * effects the how the outputs will be distributed between the parallel
+	 * instances of the next processing operator.
 	 * 
 	 * @return The DataStream with shuffle partitioning set.
 	 */
@@ -345,7 +357,9 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are distributed evenly to the next component.
+	 * are distributed evenly to the next component.This setting only effects
+	 * the how the outputs will be distributed between the parallel instances of
+	 * the next processing operator.
 	 * 
 	 * @return The DataStream with shuffle partitioning set.
 	 */
@@ -547,9 +561,9 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the sum of the pojo data stream at
-	 * the given field expression. A field expression is either the name of a
-	 * public field or a getter method with parentheses of the
+	 * Applies an aggregation that that gives the current sum of the pojo data
+	 * stream at the given field expression. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
 	 * {@link DataStream}S underlying type. A dot can be used to drill down into
 	 * objects, as in {@code "field1.getInnerField2()" }.
 	 * 
@@ -563,8 +577,8 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the minimum of the data stream at
-	 * the given position.
+	 * Applies an aggregation that that gives the current minimum of the data
+	 * stream at the given position.
 	 * 
 	 * @param positionToMin
 	 *            The position in the data point to minimize
@@ -577,9 +591,9 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the minimum of the pojo data
-	 * stream at the given field expression. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
+	 * Applies an aggregation that that gives the current minimum of the pojo
+	 * data stream at the given field expression. A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
 	 * {@link DataStream}S underlying type. A dot can be used to drill down into
 	 * objects, as in {@code "field1.getInnerField2()" }.
 	 * 
@@ -594,8 +608,8 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that gives the maximum of the data stream at the
-	 * given position.
+	 * Applies an aggregation that gives the current maximum of the data stream
+	 * at the given position.
 	 * 
 	 * @param positionToMax
 	 *            The position in the data point to maximize
@@ -608,9 +622,9 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the maximum of the pojo data
-	 * stream at the given field expression. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
+	 * Applies an aggregation that that gives the current maximum of the pojo
+	 * data stream at the given field expression. A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
 	 * {@link DataStream}S underlying type. A dot can be used to drill down into
 	 * objects, as in {@code "field1.getInnerField2()" }.
 	 * 
@@ -625,11 +639,11 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the minimum element of the pojo
-	 * data stream by the given field expression. A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * Applies an aggregation that that gives the current minimum element of the
+	 * pojo data stream by the given field expression. A field expression is
+	 * either the name of a public field or a getter method with parentheses of
+	 * the {@link DataStream}S underlying type. A dot can be used to drill down
+	 * into objects, as in {@code "field1.getInnerField2()" }.
 	 * 
 	 * @param field
 	 *            The field expression based on which the aggregation will be
@@ -645,11 +659,11 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the maximum element of the pojo
-	 * data stream by the given field expression. A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * Applies an aggregation that that gives the current maximum element of the
+	 * pojo data stream by the given field expression. A field expression is
+	 * either the name of a public field or a getter method with parentheses of
+	 * the {@link DataStream}S underlying type. A dot can be used to drill down
+	 * into objects, as in {@code "field1.getInnerField2()" }.
 	 * 
 	 * @param field
 	 *            The field expression based on which the aggregation will be
@@ -677,7 +691,7 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
-	
+
 	/**
 	 * Applies an aggregation that that gives the current element with the
 	 * minimum value at the given position, if more elements have the minimum
@@ -724,7 +738,7 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
-	
+
 	/**
 	 * Applies an aggregation that that gives the current element with the
 	 * maximum value at the given position, if more elements have the maximum
@@ -759,7 +773,8 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that gives the count of the values.
+	 * Creates a new DataStream containing the current number (count) of
+	 * received records.
 	 * 
 	 * @return The transformed DataStream.
 	 */
@@ -826,9 +841,8 @@ public class DataStream<OUT> {
 	 * @return The closed DataStream.
 	 */
 	public DataStreamSink<OUT> print() {
-		DataStream<OUT> inputStream = this.copy();
 		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
-		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType());
+		DataStreamSink<OUT> returnStream = addSink(printFunction);
 
 		return returnStream;
 	}
@@ -841,28 +855,13 @@ public class DataStream<OUT> {
 	 * @return The closed DataStream.
 	 */
 	public DataStreamSink<OUT> printToErr() {
-		DataStream<OUT> inputStream = this.copy();
 		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>(true);
-		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType());
+		DataStreamSink<OUT> returnStream = addSink(printFunction);
 
 		return returnStream;
 	}
 
 	/**
-	 * Writes a DataStream to the file specified by path in text format. For
-	 * every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * 
-	 * @return The closed DataStream
-	 */
-	public DataStreamSink<OUT> writeAsText(String path) {
-		return writeAsText(this, path, new WriteFormatAsText<OUT>(), 1, null);
-	}
-
-	/**
 	 * Writes a DataStream to the file specified by path in text format. The
 	 * writing is performed periodically, in every millis milliseconds. For
 	 * every element of the DataStream the result of {@link Object#toString()}
@@ -876,122 +875,7 @@ public class DataStream<OUT> {
 	 * @return The closed DataStream
 	 */
 	public DataStreamSink<OUT> writeAsText(String path, long millis) {
-		return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, null);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically in equally sized batches. For every
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param batchSize
-	 *            is the size of the batches, i.e. the number of tuples written
-	 *            to the file at a time
-	 * 
-	 * @return The closed DataStream
-	 */
-	public DataStreamSink<OUT> writeAsText(String path, int batchSize) {
-		return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, null);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically, in every millis milliseconds. For
-	 * every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param millis
-	 *            is the file update frequency
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            system time.
-	 * 
-	 * @return The closed DataStream
-	 */
-	public DataStreamSink<OUT> writeAsText(String path, long millis, OUT endTuple) {
-		return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, endTuple);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically in equally sized batches. For every
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param batchSize
-	 *            is the size of the batches, i.e. the number of tuples written
-	 *            to the file at a time
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            batchSize.
-	 * 
-	 * @return The closed DataStream
-	 */
-	public DataStreamSink<OUT> writeAsText(String path, int batchSize, OUT endTuple) {
-		return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, endTuple);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically, in every millis milliseconds. For
-	 * every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param millis
-	 *            is the file update frequency
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            system time.
-	 * 
-	 * @return the data stream constructed
-	 */
-	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
-			WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
-		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
-				path, format, millis, endTuple), inputStream.typeInfo);
-		return returnStream;
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically in equally sized batches. For every
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param batchSize
-	 *            is the size of the batches, i.e. the number of tuples written
-	 *            to the file at a time
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            batchSize.
-	 * 
-	 * @return the data stream constructed
-	 */
-	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
-			WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
-		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
-				inputStream.typeInfo);
-		return returnStream;
+		return writeAsText(path, new WriteFormatAsText<OUT>(), millis);
 	}
 
 	/**
@@ -1004,8 +888,8 @@ public class DataStream<OUT> {
 	 * 
 	 * @return The closed DataStream
 	 */
-	public DataStreamSink<OUT> writeAsCsv(String path) {
-		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 1, null);
+	public DataStreamSink<OUT> writeAsText(String path) {
+		return writeAsText(path, 0);
 	}
 
 	/**
@@ -1022,74 +906,28 @@ public class DataStream<OUT> {
 	 * @return The closed DataStream
 	 */
 	public DataStreamSink<OUT> writeAsCsv(String path, long millis) {
-		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, null);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically in equally sized batches. For every
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param batchSize
-	 *            is the size of the batches, i.e. the number of tuples written
-	 *            to the file at a time
-	 * 
-	 * @return The closed DataStream
-	 */
-	public DataStreamSink<OUT> writeAsCsv(String path, int batchSize) {
-		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, null);
+		if (!getType().isTupleType()) {
+			throw new RuntimeException("Only tuple data streams can be written in csv format");
+		}
+		return writeAsText(path, new WriteFormatAsCsv<OUT>(), millis);
 	}
 
 	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically, in every millis milliseconds. For
+	 * Writes a DataStream to the file specified by path in text format. For
 	 * every element of the DataStream the result of {@link Object#toString()}
 	 * is written.
 	 * 
 	 * @param path
 	 *            is the path to the location where the tuples are written
-	 * @param millis
-	 *            is the file update frequency
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            system time.
 	 * 
 	 * @return The closed DataStream
 	 */
-	public DataStreamSink<OUT> writeAsCsv(String path, long millis, OUT endTuple) {
-		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, endTuple);
+	public DataStreamSink<OUT> writeAsCsv(String path) {
+		return writeAsCsv(path, 0);
 	}
 
 	/**
 	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically in equally sized batches. For every
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param batchSize
-	 *            is the size of the batches, i.e. the number of tuples written
-	 *            to the file at a time
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            batchSize.
-	 * 
-	 * @return The closed DataStream
-	 */
-	public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT endTuple) {
-		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, endTuple);
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in csv format. The
 	 * writing is performed periodically, in every millis milliseconds. For
 	 * every element of the DataStream the result of {@link Object#toString()}
 	 * is written.
@@ -1098,45 +936,12 @@ public class DataStream<OUT> {
 	 *            is the path to the location where the tuples are written
 	 * @param millis
 	 *            is the file update frequency
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            system time.
-	 * 
-	 * @return the data stream constructed
-	 */
-	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
-			WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
-		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
-				path, format, millis, endTuple), inputStream.typeInfo);
-		return returnStream;
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in csv format. The
-	 * writing is performed periodically in equally sized batches. For every
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param batchSize
-	 *            is the size of the batches, i.e. the number of tuples written
-	 *            to the file at a time
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            batchSize.
 	 * 
 	 * @return the data stream constructed
 	 */
-	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
-			WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
-		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
-				inputStream.typeInfo);
+	private DataStreamSink<OUT> writeAsText(String path, WriteFormat<OUT> format, long millis) {
+		DataStreamSink<OUT> returnStream = addSink(new WriteSinkFunctionByMillis<OUT>(path, format,
+				millis));
 		return returnStream;
 	}
 
@@ -1241,21 +1046,13 @@ public class DataStream<OUT> {
 	 * @return The closed DataStream.
 	 */
 	public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
-		return addSink(this.copy(), sinkFunction);
-	}
-
-	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction) {
-		return addSink(inputStream, sinkFunction, getType());
-	}
 
-	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
-			SinkFunction<OUT> sinkFunction, TypeInformation<OUT> inTypeInfo) {
 		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", typeInfo);
 
 		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
-				clean(sinkFunction)), inTypeInfo, null, "sink", degreeOfParallelism);
+				clean(sinkFunction)), getType(), null, "sink", degreeOfParallelism);
 
-		inputStream.connectGraph(inputStream.copy(), returnStream.getId(), 0);
+		this.connectGraph(this.copy(), returnStream.getId(), 0);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e9581a1..5bf2164 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -212,7 +212,8 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Creates a DataStream that represents the Strings produced by reading the
 	 * given file line wise multiple times(infinite). The file will be read with
-	 * the system's default character set.
+	 * the system's default character set. This functionality can be used for
+	 * testing a topology.
 	 * 
 	 * @param filePath
 	 *            The path of the file, as a URI (e.g.,
@@ -350,8 +351,17 @@ public abstract class StreamExecutionEnvironment {
 		return addSource(new GenSequenceFunction(from, to));
 	}
 
+	private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
+			TypeInformation<String> typeInfo) {
+		FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
+		DataStreamSource<String> returnStream = addSource(function);
+		jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat);
+		return returnStream;
+	}
+
 	/**
-	 * Ads a data source thus opening a {@link DataStream}.
+	 * Create a DataStream using a user defined source function for arbitrary
+	 * source functionality.
 	 * 
 	 * @param function
 	 *            the user defined function
@@ -371,11 +381,27 @@ public abstract class StreamExecutionEnvironment {
 		return returnStream;
 	}
 
-	private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
-			TypeInformation<String> typeInfo) {
-		FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
-		DataStreamSource<String> returnStream = addSource(function);
-		jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat);
+	/**
+	 * Ads a data source with a custom type information thus opening a
+	 * {@link DataStream}. Only in very special cases does the user need to
+	 * support type information. Otherwise use
+	 * {@link #addSource(SourceFunction)}
+	 * 
+	 * @param function
+	 *            the user defined function
+	 * @param outTypeInfo
+	 *            the user defined type information for the stream
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return the data stream constructed
+	 */
+	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
+			TypeInformation<OUT> outTypeInfo) {
+
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
+
+		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
+				null, outTypeInfo, "source", 1);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
index ebc38bb..b22fd80 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
@@ -25,7 +25,7 @@ import java.util.ArrayList;
 
 /**
  * Writes tuples in csv format.
- *
+ * 
  * @param <IN>
  *            Input tuple type
  */
@@ -37,8 +37,8 @@ public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
 		try {
 			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
 			for (IN tupleToWrite : tupleList) {
-				outStream.println(tupleToWrite.toString().substring(1,
-						tupleToWrite.toString().length() - 1));
+				String strTuple = tupleToWrite.toString();
+				outStream.println(strTuple.substring(1, strTuple.length() - 1));
 			}
 			outStream.close();
 		} catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
index 01298b0..0c52afc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
@@ -35,13 +35,11 @@ public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
 
 	protected final String path;
 	protected ArrayList<IN> tupleList = new ArrayList<IN>();
-	protected final IN endTuple;
 	protected WriteFormat<IN> format;
 
-	public WriteSinkFunction(String path, WriteFormat<IN> format, IN endTuple) {
+	public WriteSinkFunction(String path, WriteFormat<IN> format) {
 		this.path = path;
 		this.format = format;
-		this.endTuple = endTuple;
 		cleanFile(path);
 	}
 
@@ -82,16 +80,13 @@ public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
 	 */
 	@Override
 	public void invoke(IN tuple) {
-		if (!tuple.equals(endTuple)) {
-			tupleList.add(tuple);
-			if (updateCondition()) {
-				format.write(path, tupleList);
-				resetParameters();
-			}
-		} else {
+
+		tupleList.add(tuple);
+		if (updateCondition()) {
 			format.write(path, tupleList);
 			resetParameters();
 		}
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
deleted file mode 100644
index 5012e25..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-
-/**
- * Implementation of WriteSinkFunction. Writes tuples to file in equally sized
- * batches.
- *
- * @param <IN>
- *            Input tuple type
- */
-public class WriteSinkFunctionByBatches<IN> extends WriteSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private final int batchSize;
-
-	public WriteSinkFunctionByBatches(String path, WriteFormat<IN> format, int batchSize,
-			IN endTuple) {
-		super(path, format, endTuple);
-		this.batchSize = batchSize;
-	}
-
-	@Override
-	protected boolean updateCondition() {
-		return tupleList.size() >= batchSize;
-	}
-
-	@Override
-	protected void resetParameters() {
-		tupleList.clear();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index 1fa978e..ee6df94 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -17,11 +17,10 @@
 
 package org.apache.flink.streaming.api.function.sink;
 
-
 /**
  * Implementation of WriteSinkFunction. Writes tuples to file in every millis
  * milliseconds.
- *
+ * 
  * @param <IN>
  *            Input tuple type
  */
@@ -31,8 +30,8 @@ public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
 	private final long millis;
 	private long lastTime;
 
-	public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis, IN endTuple) {
-		super(path, format, endTuple);
+	public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) {
+		super(path, format);
 		this.millis = millis;
 		lastTime = System.currentTimeMillis();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
deleted file mode 100644
index 6b1dd5a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class WriteAsCsvTest {
-	
-	private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsCsvTest.class.getSimpleName() + "_";
-	
-	private static final long MEMORYSIZE = 32;
-
-	private static List<String> result1 = new ArrayList<String>();
-	private static List<String> result2 = new ArrayList<String>();
-	private static List<String> result3 = new ArrayList<String>();
-	private static List<String> result4 = new ArrayList<String>();
-	private static List<String> result5 = new ArrayList<String>();
-
-	private static List<String> expected1 = new ArrayList<String>();
-	private static List<String> expected2 = new ArrayList<String>();
-	private static List<String> expected3 = new ArrayList<String>();
-	private static List<String> expected4 = new ArrayList<String>();
-	private static List<String> expected5 = new ArrayList<String>();
-
-	public static final class MySource1 implements SourceFunction<Tuple1<Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
-			for (int i = 0; i < 27; i++) {
-				collector.collect(new Tuple1<Integer>(i));
-			}
-		}
-	}
-
-	private static void readFile(String path, List<String> result) {
-		try {
-			BufferedReader br = new BufferedReader(new FileReader(path));
-			String line;
-			line = br.readLine();
-			while (line != null) {
-				result.add(line);
-				line = br.readLine();
-			}
-			br.close();
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
-	}
-
-	private static void fillExpected1() {
-		for (int i = 0; i < 27; i++) {
-			expected1.add(i + "");
-		}
-	}
-
-	private static void fillExpected2() {
-		for (int i = 0; i < 25; i++) {
-			expected2.add(i + "");
-		}
-	}
-
-	private static void fillExpected3() {
-		for (int i = 0; i < 20; i++) {
-			expected3.add(i + "");
-		}
-	}
-
-	private static void fillExpected4() {
-		for (int i = 0; i < 26; i++) {
-			expected4.add(i + "");
-		}
-	}
-
-	private static void fillExpected5() {
-		for (int i = 0; i < 14; i++) {
-			expected5.add(i + "");
-		}
-
-		for (int i = 15; i < 25; i++) {
-			expected5.add(i + "");
-		}
-	}
-
-	@BeforeClass
-	public static void createFileCleanup() {
-		Runnable r = new Runnable() {
-			
-			@Override
-			public void run() {
-				try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {}
-				try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {}
-				try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {}
-				try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {}
-				try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {}
-			}
-		};
-		
-		Runtime.getRuntime().addShutdownHook(new Thread(r));
-	}
-	
-	@Test
-	public void test() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test1.txt");
-
-		fillExpected1();
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test2.txt", 5);
-
-		fillExpected2();
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test3.txt", 10);
-
-		fillExpected3();
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
-
-		fillExpected4();
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
-
-		fillExpected5();
-
-		env.executeTest(MEMORYSIZE);
-
-		readFile(PREFIX + "test1.txt", result1);
-		readFile(PREFIX + "test2.txt", result2);
-		readFile(PREFIX + "test3.txt", result3);
-		readFile(PREFIX + "test4.txt", result4);
-		readFile(PREFIX + "test5.txt", result5);
-
-		assertTrue(expected1.equals(result1));
-		assertTrue(expected2.equals(result2));
-		assertTrue(expected3.equals(result3));
-		assertTrue(expected4.equals(result4));
-		assertTrue(expected5.equals(result5));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
deleted file mode 100644
index e21f21d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class WriteAsTextTest {
-	
-	private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsTextTest.class.getSimpleName() + "_";
-	
-	private static final long MEMORYSIZE = 32;
-
-	private static List<String> result1 = new ArrayList<String>();
-	private static List<String> result2 = new ArrayList<String>();
-	private static List<String> result3 = new ArrayList<String>();
-	private static List<String> result4 = new ArrayList<String>();
-	private static List<String> result5 = new ArrayList<String>();
-
-	private static List<String> expected1 = new ArrayList<String>();
-	private static List<String> expected2 = new ArrayList<String>();
-	private static List<String> expected3 = new ArrayList<String>();
-	private static List<String> expected4 = new ArrayList<String>();
-	private static List<String> expected5 = new ArrayList<String>();
-
-	public static final class MySource1 implements SourceFunction<Tuple1<Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
-			for (int i = 0; i < 27; i++) {
-				collector.collect(new Tuple1<Integer>(i));
-			}
-		}
-	}
-
-	private static void readFile(String path, List<String> result) {
-		try {
-			BufferedReader br = new BufferedReader(new FileReader(path));
-			String line;
-			line = br.readLine();
-			while (line != null) {
-				result.add(line);
-				line = br.readLine();
-			}
-			br.close();
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
-	}
-
-	private static void fillExpected1() {
-		for (int i = 0; i < 27; i++) {
-			expected1.add("(" + i + ")");
-		}
-	}
-
-	private static void fillExpected2() {
-		for (int i = 0; i < 25; i++) {
-			expected2.add("(" + i + ")");
-		}
-	}
-
-	private static void fillExpected3() {
-		for (int i = 0; i < 20; i++) {
-			expected3.add("(" + i + ")");
-		}
-	}
-
-	private static void fillExpected4() {
-		for (int i = 0; i < 26; i++) {
-			expected4.add("(" + i + ")");
-		}
-	}
-
-	private static void fillExpected5() {
-		for (int i = 0; i < 14; i++) {
-			expected5.add("(" + i + ")");
-		}
-
-		for (int i = 15; i < 25; i++) {
-			expected5.add("(" + i + ")");
-		}
-	}
-	
-	@BeforeClass
-	public static void createFileCleanup() {
-		Runnable r = new Runnable() {
-			
-			@Override
-			public void run() {
-				try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {}
-				try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {}
-				try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {}
-				try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {}
-				try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {}
-			}
-		};
-		
-		Runtime.getRuntime().addShutdownHook(new Thread(r));
-	}
-
-	@Test
-	public void test() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsText(PREFIX + "test1.txt");
-
-		fillExpected1();
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsText(PREFIX + "test2.txt", 5);
-
-		fillExpected2();
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsText(PREFIX + "test3.txt", 10);
-
-		fillExpected3();
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
-
-		fillExpected4();
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
-
-		fillExpected5();
-
-		env.executeTest(MEMORYSIZE);
-
-		readFile(PREFIX + "test1.txt", result1);
-		readFile(PREFIX + "test2.txt", result2);
-		readFile(PREFIX + "test3.txt", result3);
-		readFile(PREFIX + "test4.txt", result4);
-		readFile(PREFIX + "test5.txt", result5);
-
-		assertEquals(expected1,result1);
-		assertEquals(expected2,result2);
-		assertEquals(expected3,result3);
-		assertEquals(expected4,result4);
-		assertEquals(expected5,result5);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index b10bdc6..e96f5eb 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.GroupedDataStream
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.streaming.api.function.sink.SinkFunction
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -86,15 +87,36 @@ class DataStream[T](javaStream: JavaStream[T]) {
         "parallelism.")
   }
 
+  /**
+   * Creates a new DataStream by merging DataStream outputs of
+   * the same type with each other. The DataStreams merged using this operator
+   * will be transformed simultaneously.
+   *
+   */
   def merge(dataStreams: DataStream[T]*): DataStream[T] =
     new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*))
 
+  /**
+   * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
   def groupBy(fields: Int*): DataStream[T] =
     new DataStream[T](javaStream.groupBy(fields: _*))
 
+  /**
+   * Groups the elements of a DataStream by the given field expressions to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
   def groupBy(firstField: String, otherFields: String*): DataStream[T] =
     new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: _*))
 
+  /**
+   * Groups the elements of a DataStream by the given K key to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
   def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
 
     val keyExtractor = new KeySelector[T, K] {
@@ -104,12 +126,27 @@ class DataStream[T](javaStream: JavaStream[T]) {
     new DataStream[T](javaStream.groupBy(keyExtractor))
   }
 
+  /**
+   * Sets the partitioning of the DataStream so that the output is
+   * partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator.
+   *
+   */
   def partitionBy(fields: Int*): DataStream[T] =
     new DataStream[T](javaStream.partitionBy(fields: _*))
 
+  /**
+   * Sets the partitioning of the DataStream so that the output is
+   * partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator.
+   *
+   */
   def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
     new DataStream[T](javaStream.partitionBy(firstField +: otherFields.toArray: _*))
 
+  /**
+   * Sets the partitioning of the DataStream so that the output is
+   * partitioned by the given Key. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator.
+   *
+   */
   def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
 
     val keyExtractor = new KeySelector[T, K] {
@@ -119,56 +156,124 @@ class DataStream[T](javaStream: JavaStream[T]) {
     new DataStream[T](javaStream.partitionBy(keyExtractor))
   }
 
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are broadcasted to every parallel instance of the next component. This
+   * setting only effects the how the outputs will be distributed between the
+   * parallel instances of the next processing operator.
+   *
+   */
   def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast())
 
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are shuffled to the next component. This setting only effects the how the
+   * outputs will be distributed between the parallel instances of the next
+   * processing operator.
+   *
+   */
   def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle())
 
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are forwarded to the local subtask of the next component (whenever
+   * possible). This is the default partitioner setting. This setting only
+   * effects the how the outputs will be distributed between the parallel
+   * instances of the next processing operator.
+   *
+   */
   def forward: DataStream[T] = new DataStream[T](javaStream.forward())
 
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are distributed evenly to the next component.This setting only effects
+   * the how the outputs will be distributed between the parallel instances of
+   * the next processing operator.
+   *
+   */
   def distribute: DataStream[T] = new DataStream[T](javaStream.distribute())
 
+  /**
+   * Applies an aggregation that that gives the current maximum of the data stream at
+   * the given position.
+   *
+   */
   def max(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.max(field))
     case field: String => return new DataStream[T](javaStream.max(field))
     case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current minimum of the data stream at
+   * the given position.
+   *
+   */
   def min(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.min(field))
     case field: String => return new DataStream[T](javaStream.min(field))
     case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that sums the data stream at the given position.
+   *
+   */
   def sum(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.sum(field))
     case field: String => return new DataStream[T](javaStream.sum(field))
     case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current maximum element of the data stream by
+   * the given position. When equality, returns the first.
+   *
+   */
   def maxBy(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.maxBy(field))
     case field: String => return new DataStream[T](javaStream.maxBy(field))
     case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current minimum element of the data stream by
+   * the given position. When equality, returns the first.
+   *
+   */
   def minBy(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.minBy(field))
     case field: String => return new DataStream[T](javaStream.minBy(field))
     case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current minimum element of the data stream by
+   * the given position. When equality, the user can set to get the first or last element with the minimal value.
+   *
+   */
   def minBy(field: Any, first: Boolean): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.minBy(field, first))
     case field: String => return new DataStream[T](javaStream.minBy(field, first))
     case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current maximum element of the data stream by
+   * the given position. When equality, the user can set to get the first or last element with the maximal value.
+   *
+   */
   def maxBy(field: Any, first: Boolean): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.maxBy(field, first))
     case field: String => return new DataStream[T](javaStream.maxBy(field, first))
     case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Creates a new DataStream containing the current number (count) of
+   * received records.
+   *
+   */
   def count: DataStream[java.lang.Long] = new DataStream[java.lang.Long](javaStream.count())
 
   /**
@@ -239,7 +344,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new [[DataStream]] by merging the elements of this DataStream using an associative reduce
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce
    * function.
    */
   def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
@@ -253,7 +358,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new [[DataStream]] by merging the elements of this DataStream using an associative reduce
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce
    * function.
    */
   def reduce(fun: (T, T) => T): DataStream[T] = {
@@ -268,7 +373,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new DataSet that contains only the elements satisfying the given filter predicate.
+   * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
    */
   def filter(filter: FilterFunction[T]): DataStream[T] = {
     if (filter == null) {
@@ -277,6 +382,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
     new DataStream[T](javaStream.filter(filter))
   }
 
+  /**
+   * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
+   */
   def filter(fun: T => Boolean): DataStream[T] = {
     if (fun == null) {
       throw new NullPointerException("Filter function must not be null.")
@@ -288,6 +396,71 @@ class DataStream[T](javaStream: JavaStream[T]) {
     this.filter(filter)
   }
 
-  def print() = javaStream.print()
+  /**
+   * Writes a DataStream to the standard output stream (stdout). For each
+   * element of the DataStream the result of .toString is
+   * written.
+   *
+   */
+  def print(): DataStream[T] = new DataStream[T](javaStream.print())
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsText(path: String, millis: Long): DataStream[T] = new DataStream[T](javaStream.writeAsText(path, millis))
+
+  /**
+   * Writes a DataStream to the file specified by path in text format.
+   * For every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsText(path: String): DataStream[T] = new DataStream[T](javaStream.writeAsText(path))
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsCsv(path: String, millis: Long): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path, millis))
+
+  /**
+   * Writes a DataStream to the file specified by path in text format.
+   * For every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsCsv(path: String): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path))
+
+  /**
+   * Adds the given sink to this DataStream. Only streams with sinks added
+   * will be executed once the StreamExecutionEnvironment.execute(...)
+   * method is called.
+   *
+   */
+  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = new DataStream[T](javaStream.addSink(sinkFuntion))
+
+  /**
+   * Adds the given sink to this DataStream. Only streams with sinks added
+   * will be executed once the StreamExecutionEnvironment.execute(...)
+   * method is called.
+   *
+   */
+  def addSink(fun: T => Unit): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Sink function must not be null.")
+    }
+    val sinkFunction = new SinkFunction[T] {
+      val cleanFun = clean(fun)
+      def invoke(in: T) = cleanFun(in)
+    }
+    this.addSink(sinkFunction)
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d1b53b16/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index df6c561..e4a7b48 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -25,6 +25,7 @@ import scala.reflect.ClassTag
 import org.apache.flink.streaming.api.datastream.DataStreamSource
 import org.apache.flink.streaming.api.invokable.SourceInvokable
 import org.apache.flink.streaming.api.function.source.FromElementsFunction
+import org.apache.flink.streaming.api.function.source.SourceFunction
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -44,10 +45,78 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
 
+  /**
+   * Sets the maximum time frequency (milliseconds) for the flushing of the
+   * output buffers. By default the output buffers flush frequently to provide
+   * low latency and to aid smooth developer experience. Setting the parameter
+   * can result in three logical modes:
+   *
+   * <ul>
+   * <li>
+   * A positive integer triggers flushing periodically by that integer</li>
+   * <li>
+   * 0 triggers flushing after every record thus minimizing latency</li>
+   * <li>
+   * -1 triggers flushing only when the output buffer is full thus maximizing
+   * throughput</li>
+   * </ul>
+   *
+   */
+  def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
+    javaEnv.setBufferTimeout(timeoutMillis)
+    this
+  }
+
+  /**
+   * Gets the default buffer timeout set for this environment
+   */
+  def getBufferTimout: Long = javaEnv.getBufferTimeout()
+
+  /**
+   * Creates a DataStream that represents the Strings produced by reading the
+   * given file line wise. The file will be read with the system's default
+   * character set.
+   *
+   */
+  def readTextFile(filePath: String): DataStream[String] =
+    new DataStream[String](javaEnv.readTextFile(filePath))
+
+  /**
+   * Creates a DataStream that represents the Strings produced by reading the
+   * given file line wise multiple times(infinite). The file will be read with
+   * the system's default character set. This functionality can be used for
+   * testing a topology.
+   *
+   */
+  def readTextStream(StreamPath: String): DataStream[String] =
+    new DataStream[String](javaEnv.readTextStream(StreamPath))
+
+  /**
+   * Creates a new DataStream that contains the strings received infinitely
+   * from socket. Received strings are decoded by the system's default
+   * character set.
+   *
+   */
+  def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] =
+    new DataStream[String](javaEnv.socketTextStream(hostname, port, delimiter))
+
+  /**
+   * Creates a new DataStream that contains the strings received infinitely
+   * from socket. Received strings are decoded by the system's default
+   * character set, uses '\n' as delimiter.
+   *
+   */
+  def socketTextStream(hostname: String, port: Int): DataStream[String] =
+    new DataStream[String](javaEnv.socketTextStream(hostname, port))
+
+  /**
+   * Creates a new DataStream that contains a sequence of numbers.
+   *
+   */
   def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new DataStream(javaEnv.generateSequence(from, to))
 
   /**
-   * Creates a new data stream that contains the given elements. The elements must all be of the
+   * Creates a DataStream that contains the given elements. The elements must all be of the
    * same type and must be serializable.
    *
    * * Note that this operation will result in a non-parallel data source, i.e. a data source with
@@ -78,8 +147,38 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     new DataStream(returnStream)
   }
 
+  /**
+   * Create a DataStream using a user defined source function for arbitrary
+   * source functionality.
+   *
+   */
+  def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
+    Validate.notNull(function, "Function must not be null.")
+    val typeInfo = implicitly[TypeInformation[T]]
+    new DataStream[T](javaEnv.addSource(function, typeInfo))
+  }
+
+  /**
+   * Triggers the program execution. The environment will execute all parts of
+   * the program that have resulted in a "sink" operation. Sink operations are
+   * for example printing results or forwarding them to a message queue.
+   * <p>
+   * The program execution will be logged and displayed with a generated
+   * default name.
+   *
+   */
   def execute() = javaEnv.execute()
 
+  /**
+   * Triggers the program execution. The environment will execute all parts of
+   * the program that have resulted in a "sink" operation. Sink operations are
+   * for example printing results or forwarding them to a message queue.
+   * <p>
+   * The program execution will be logged and displayed with the provided name
+   *
+   */
+  def execute(jobName: String) = javaEnv.execute(jobName)
+
 }
 
 object StreamExecutionEnvironment {
@@ -108,7 +207,7 @@ object StreamExecutionEnvironment {
    * Creates a remote execution environment. The remote environment sends (parts of) the program to
    * a cluster for execution. Note that all file paths used in the program must be accessible from
    * the cluster. The execution will use the cluster's default degree of parallelism, unless the
-   * parallelism is set explicitly via [[ExecutionEnvironment.setDegreeOfParallelism()]].
+   * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]].
    *
    * @param host The host name or address of the master (JobManager),
    *             where the program should be executed.


Mime
View raw message