flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [02/51] [abbrv] git commit: [streaming] Package refactor & cleanup
Date Mon, 18 Aug 2014 17:25:39 GMT
[streaming] Package refactor & cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/60f632a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/60f632a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/60f632a3

Branch: refs/heads/master
Commit: 60f632a305f7019b638088f0a6d88ed2b42b267f
Parents: c79b48b
Author: mbalassi <balassi.marton@gmail.com>
Authored: Fri Jul 18 12:36:04 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 18 16:14:12 2014 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/KafkaSink.java   |   2 +-
 .../streaming/connectors/kafka/KafkaSource.java |   3 +-
 .../connectors/kafka/KafkaTopology.java         |   3 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |   2 +-
 .../connectors/rabbitmq/RMQSource.java          |   2 +-
 .../streaming/connectors/rabbitmq/RMQTest.java  |   2 +-
 .../apache/flink/streaming/api/DataStream.java  |   6 +-
 .../api/StreamExecutionEnvironment.java         |  22 ++--
 .../api/function/FileSourceFunction.java        |  53 ----------
 .../api/function/FileStreamFunction.java        |  54 ----------
 .../api/function/FromElementsFunction.java      |  50 ---------
 .../api/function/GenSequenceFunction.java       |  50 ---------
 .../api/function/PrintSinkFunction.java         |  39 -------
 .../streaming/api/function/SinkFunction.java    |  33 ------
 .../streaming/api/function/SourceFunction.java  |  30 ------
 .../streaming/api/function/WriteFormat.java     |  47 ---------
 .../api/function/WriteFormatAsCsv.java          |  53 ----------
 .../api/function/WriteFormatAsText.java         |  51 ----------
 .../api/function/WriteSinkFunction.java         | 101 -------------------
 .../function/WriteSinkFunctionByBatches.java    |  52 ----------
 .../api/function/WriteSinkFunctionByMillis.java |  54 ----------
 .../api/function/sink/PrintSinkFunction.java    |  39 +++++++
 .../api/function/sink/SinkFunction.java         |  33 ++++++
 .../api/function/sink/WriteFormat.java          |  47 +++++++++
 .../api/function/sink/WriteFormatAsCsv.java     |  53 ++++++++++
 .../api/function/sink/WriteFormatAsText.java    |  51 ++++++++++
 .../api/function/sink/WriteSinkFunction.java    | 101 +++++++++++++++++++
 .../sink/WriteSinkFunctionByBatches.java        |  52 ++++++++++
 .../sink/WriteSinkFunctionByMillis.java         |  54 ++++++++++
 .../api/function/source/FileSourceFunction.java |  53 ++++++++++
 .../api/function/source/FileStreamFunction.java |  54 ++++++++++
 .../function/source/FromElementsFunction.java   |  50 +++++++++
 .../function/source/GenSequenceFunction.java    |  50 +++++++++
 .../api/function/source/SourceFunction.java     |  30 ++++++
 .../streaming/api/invokable/SinkInvokable.java  |   3 +-
 .../AbstractStreamComponent.java                |   2 +-
 .../apache/flink/streaming/api/IterateTest.java |   2 +-
 .../flink/streaming/api/WriteAsCsvTest.java     |  21 +++-
 .../flink/streaming/api/WriteAsTextTest.java    |   6 +-
 .../api/collector/DirectedOutputTest.java       |   2 +-
 .../api/invokable/operator/BatchReduceTest.java |   4 +-
 .../api/invokable/operator/FlatMapTest.java     |   2 +-
 .../api/invokable/operator/MapTest.java         |   4 +-
 .../streamcomponent/StreamComponentTest.java    |   5 +-
 .../examples/basictopology/BasicTopology.java   |   3 +-
 .../examples/cellinfo/CellInfoLocal.java        |   3 +-
 .../CollaborativeFilteringSink.java             |   3 +-
 .../CollaborativeFilteringSource.java           |   3 +-
 .../examples/iterative/kmeans/KMeansSink.java   |   3 +-
 .../examples/iterative/kmeans/KMeansSource.java |   3 +-
 .../iterative/pagerank/PageRankSink.java        |   3 +-
 .../iterative/pagerank/PageRankSource.java      |   2 +-
 .../examples/iterative/sssp/SSSPSink.java       |   3 +-
 .../examples/iterative/sssp/SSSPSource.java     |   3 +-
 .../flink/streaming/examples/join/JoinSink.java |   3 +-
 .../streaming/examples/join/JoinSourceOne.java  |   3 +-
 .../streaming/examples/join/JoinSourceTwo.java  |   3 +-
 .../ml/IncrementalLearningSkeleton.java         |   3 +-
 .../streaming/examples/ml/IncrementalOLS.java   |   3 +-
 .../window/join/WindowJoinSourceOne.java        |   3 +-
 .../window/join/WindowJoinSourceTwo.java        |   3 +-
 .../examples/window/sum/WindowSumSink.java      |   3 +-
 .../examples/window/sum/WindowSumSource.java    |   3 +-
 .../window/wordcount/WindowWordCountSink.java   |   3 +-
 64 files changed, 742 insertions(+), 744 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index 42795b4..eeac961 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -26,7 +26,7 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 2f9b613..d34b6c3 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -24,12 +24,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.flink.streaming.api.function.SourceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
 
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
+
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 4c0b636..1a77aee 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -21,8 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.SourceFunction;
-
+import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 6317b89..e6e8de5 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index c3d121b..4b197e3 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.SourceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
 import com.rabbitmq.client.Channel;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
index 673c560..c6a43f2 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.function.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.junit.Test;
 
 public class RMQTest {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index a94834a..6e16a85 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -29,9 +29,9 @@ import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment.ConnectionType;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.function.SinkFunction;
-import org.apache.flink.streaming.api.function.WriteFormatAsCsv;
-import org.apache.flink.streaming.api.function.WriteFormatAsText;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
 import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index 86c54ef..87cd154 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -28,17 +28,17 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.function.FileSourceFunction;
-import org.apache.flink.streaming.api.function.FileStreamFunction;
-import org.apache.flink.streaming.api.function.FromElementsFunction;
-import org.apache.flink.streaming.api.function.GenSequenceFunction;
-import org.apache.flink.streaming.api.function.PrintSinkFunction;
-import org.apache.flink.streaming.api.function.SinkFunction;
-import org.apache.flink.streaming.api.function.SourceFunction;
-import org.apache.flink.streaming.api.function.WriteFormatAsCsv;
-import org.apache.flink.streaming.api.function.WriteFormatAsText;
-import org.apache.flink.streaming.api.function.WriteSinkFunctionByBatches;
-import org.apache.flink.streaming.api.function.WriteSinkFunctionByMillis;
+import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
+import org.apache.flink.streaming.api.function.source.FileSourceFunction;
+import org.apache.flink.streaming.api.function.source.FileStreamFunction;
+import org.apache.flink.streaming.api.function.source.FromElementsFunction;
+import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FileSourceFunction.java
deleted file mode 100644
index 0c98117..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FileSourceFunction.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.function;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
-
-public class FileSourceFunction extends SourceFunction<Tuple1<String>> {
-	private static final long serialVersionUID = 1L;
-	
-	private final String path;
-	private Tuple1<String> outTuple = new Tuple1<String>();
-	
-	public FileSourceFunction(String path) {
-		this.path = path;
-	}
-	
-	@Override
-	public void invoke(Collector<Tuple1<String>> collector) throws IOException {
-		BufferedReader br = new BufferedReader(new FileReader(path));
-		String line = br.readLine();
-		while (line != null) {
-			if (line != "") {
-				outTuple.f0 = line;
-				collector.collect(outTuple);
-			}
-			line = br.readLine();
-		}
-		br.close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FileStreamFunction.java
deleted file mode 100644
index 15e685e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FileStreamFunction.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.function;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
-
-public class FileStreamFunction extends SourceFunction<Tuple1<String>>{
-	private static final long serialVersionUID = 1L;
-	
-	private final String path;
-	private Tuple1<String> outTuple = new Tuple1<String>();
-	
-	public FileStreamFunction(String path) {
-		this.path = path;
-	}
-	
-	@Override
-	public void invoke(Collector<Tuple1<String>> collector) throws IOException {
-		while(true){
-			BufferedReader br = new BufferedReader(new FileReader(path));
-			String line = br.readLine();
-			while (line != null) {
-				if (line != "") {
-					outTuple.f0 = line;
-					collector.collect(outTuple);
-				}
-				line = br.readLine();
-			}
-			br.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FromElementsFunction.java
deleted file mode 100755
index 6e6d194..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/FromElementsFunction.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.function;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
-
-public class FromElementsFunction<T> extends SourceFunction<Tuple1<T>> {
-	private static final long serialVersionUID = 1L;
-
-	Iterable<T> iterable;
-	Tuple1<T> outTuple = new Tuple1<T>();
-
-	public FromElementsFunction(T... elements) {
-		this.iterable = Arrays.asList(elements);
-	}
-
-	public FromElementsFunction(Collection<T> elements) {
-		this.iterable = elements;
-	}
-
-	@Override
-	public void invoke(Collector<Tuple1<T>> collector) throws Exception {
-		for (T element : iterable) {
-			outTuple.f0 = element;
-			collector.collect(outTuple);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/GenSequenceFunction.java
deleted file mode 100755
index 08505c3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/GenSequenceFunction.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.function;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
-
-/**
- * Source Function used to generate the number sequence
- * 
- */
-public class GenSequenceFunction extends SourceFunction<Tuple1<Long>> {
-
-	private static final long serialVersionUID = 1L;
-
-	long from;
-	long to;
-	Tuple1<Long> outTuple = new Tuple1<Long>();
-
-	public GenSequenceFunction(long from, long to) {
-		this.from = from;
-		this.to = to;
-	}
-
-	@Override
-	public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
-		for (long i = from; i <= to; i++) {
-			outTuple.f0 = i;
-			collector.collect(outTuple);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/PrintSinkFunction.java
deleted file mode 100755
index 18e34e8..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/PrintSinkFunction.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.function;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Dummy implementation of the SinkFunction writing every tuple to the standard
- * output. Used for print.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public class PrintSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke(IN tuple) {
-		System.out.println(tuple);
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/SinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/SinkFunction.java
deleted file mode 100644
index 2136ef3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/SinkFunction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.function;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-
-public abstract class SinkFunction<IN extends Tuple> extends AbstractFunction implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract void invoke(IN tuple);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/SourceFunction.java
deleted file mode 100755
index 0616605..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/SourceFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.function;
-
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public abstract class SourceFunction<OUT extends Tuple> extends UserSourceInvokable<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormat.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormat.java
deleted file mode 100644
index d5f63d8..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormat.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/***********************************************************************************************************************
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.streaming.api.function;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Abstract class for formatting the output of the writeAsText and writeAsCsv
- * functions.
- *
- * @param <IN>
- *            Input tuple type
- */
-public abstract class WriteFormat<IN extends Tuple> implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Writes the contents of tupleList to the file specified by path.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param tupleList
-	 *            is the list of tuples to be written
-	 */
-	protected abstract void write(String path, ArrayList<IN> tupleList);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormatAsCsv.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormatAsCsv.java
deleted file mode 100644
index 95284b9..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormatAsCsv.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/***********************************************************************************************************************
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.streaming.api.function;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Writes tuples in csv format.
- *
- * @param <IN>
- *            Input tuple type
- */
-public class WriteFormatAsCsv<IN extends Tuple> extends WriteFormat<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	protected void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
-			for (IN tupleToWrite : tupleList) {
-				outStream.println(tupleToWrite.toString().substring(1,
-						tupleToWrite.toString().length() - 1));
-			}
-			outStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while writing file " + path, e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormatAsText.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormatAsText.java
deleted file mode 100644
index 7071dd6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteFormatAsText.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/***********************************************************************************************************************
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.streaming.api.function;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Writes tuples in text format.
- *
- * @param <IN>
- *            Input tuple type
- */
-public class WriteFormatAsText<IN extends Tuple> extends WriteFormat<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
-			for (IN tupleToWrite : tupleList) {
-				outStream.println(tupleToWrite);
-			}
-			outStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while writing file " + path, e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunction.java
deleted file mode 100644
index 9ef476d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunction.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/***********************************************************************************************************************
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.streaming.api.function;
-
-import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Simple implementation of the SinkFunction writing tuples as simple text to
- * the file specified by path. Tuples are collected to a list and written to the
- * file periodically. The file specified by path is created if it does not
- * exist, cleared if it exists before the writing.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public abstract class WriteSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	protected final String path;
-	protected ArrayList<IN> tupleList = new ArrayList<IN>();
-	protected final IN endTuple;
-	protected WriteFormat<IN> format;
-
-	public WriteSinkFunction(String path, WriteFormat<IN> format, IN endTuple) {
-		this.path = path;
-		this.format = format;
-		this.endTuple = endTuple;
-		cleanFile(path);
-	}
-
-	/**
-	 * Creates target file if it does not exist, cleans it if it exists.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 */
-	protected void cleanFile(String path) {
-		try {
-			PrintWriter writer;
-			writer = new PrintWriter(path);
-			writer.print("");
-			writer.close();
-		} catch (FileNotFoundException e) {
-			throw new RuntimeException("File not found " + path, e);
-		}
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	protected abstract boolean updateCondition();
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	protected abstract void resetParameters();
-
-	/**
-	 * Implementation of the invoke method of the SinkFunction class. Collects
-	 * the incoming tuples in tupleList and appends the list to the end of the
-	 * target file if updateCondition() is true or the current tuple is the
-	 * endTuple.
-	 */
-	@Override
-	public void invoke(IN tuple) {
-		if (!tuple.equals(endTuple)) {
-			tupleList.add(tuple);
-			if (updateCondition()) {
-				format.write(path, tupleList);
-				resetParameters();
-			}
-		} else {
-			format.write(path, tupleList);
-			resetParameters();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunctionByBatches.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunctionByBatches.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunctionByBatches.java
deleted file mode 100644
index b3dea66..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunctionByBatches.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/***********************************************************************************************************************
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.streaming.api.function;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Implementation of WriteSinkFunction. Writes tuples to file in equally sized
- * batches.
- *
- * @param <IN>
- *            Input tuple type
- */
-public class WriteSinkFunctionByBatches<IN extends Tuple> extends WriteSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private final int batchSize;
-
-	public WriteSinkFunctionByBatches(String path, WriteFormat<IN> format, int batchSize,
-			IN endTuple) {
-		super(path, format, endTuple);
-		this.batchSize = batchSize;
-	}
-
-	@Override
-	protected boolean updateCondition() {
-		return tupleList.size() >= batchSize;
-	}
-
-	@Override
-	protected void resetParameters() {
-		tupleList.clear();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunctionByMillis.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunctionByMillis.java
deleted file mode 100644
index b843294..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WriteSinkFunctionByMillis.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/***********************************************************************************************************************
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.streaming.api.function;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Implementation of WriteSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- *
- * @param <IN>
- *            Input tuple type
- */
-public class WriteSinkFunctionByMillis<IN extends Tuple> extends WriteSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private final long millis;
-	private long lastTime;
-
-	public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis, IN endTuple) {
-		super(path, format, endTuple);
-		this.millis = millis;
-		lastTime = System.currentTimeMillis();
-	}
-
-	@Override
-	protected boolean updateCondition() {
-		return System.currentTimeMillis() - lastTime >= millis;
-	}
-
-	@Override
-	protected void resetParameters() {
-		tupleList.clear();
-		lastTime = System.currentTimeMillis();
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
new file mode 100755
index 0000000..7918e48
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Dummy implementation of the SinkFunction writing every tuple to the standard
+ * output. Used for print.
+ * 
+ * @param <IN>
+ *            Input tuple type
+ */
+public class PrintSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void invoke(IN tuple) {
+		System.out.println(tuple);
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
new file mode 100644
index 0000000..cc4fb96
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+
+public abstract class SinkFunction<IN extends Tuple> extends AbstractFunction implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract void invoke(IN tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
new file mode 100644
index 0000000..18853b3
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Abstract class for formatting the output of the writeAsText and writeAsCsv
+ * functions.
+ *
+ * @param <IN>
+ *            Input tuple type
+ */
+public abstract class WriteFormat<IN extends Tuple> implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Writes the contents of tupleList to the file specified by path.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param tupleList
+	 *            is the list of tuples to be written
+	 */
+	protected abstract void write(String path, ArrayList<IN> tupleList);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
new file mode 100644
index 0000000..e10a9c8
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Writes tuples in csv format.
+ *
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteFormatAsCsv<IN extends Tuple> extends WriteFormat<IN> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	protected void write(String path, ArrayList<IN> tupleList) {
+		try {
+			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+			for (IN tupleToWrite : tupleList) {
+				outStream.println(tupleToWrite.toString().substring(1,
+						tupleToWrite.toString().length() - 1));
+			}
+			outStream.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Exception occured while writing file " + path, e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
new file mode 100644
index 0000000..2d591ae
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Writes tuples in text format.
+ *
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteFormatAsText<IN extends Tuple> extends WriteFormat<IN> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void write(String path, ArrayList<IN> tupleList) {
+		try {
+			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+			for (IN tupleToWrite : tupleList) {
+				outStream.println(tupleToWrite);
+			}
+			outStream.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Exception occured while writing file " + path, e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
new file mode 100644
index 0000000..d473190
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Simple implementation of the SinkFunction writing tuples as simple text to
+ * the file specified by path. Tuples are collected to a list and written to the
+ * file periodically. The file specified by path is created if it does not
+ * exist, cleared if it exists before the writing.
+ * 
+ * @param <IN>
+ *            Input tuple type
+ */
+public abstract class WriteSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	protected final String path;
+	protected ArrayList<IN> tupleList = new ArrayList<IN>();
+	protected final IN endTuple;
+	protected WriteFormat<IN> format;
+
+	public WriteSinkFunction(String path, WriteFormat<IN> format, IN endTuple) {
+		this.path = path;
+		this.format = format;
+		this.endTuple = endTuple;
+		cleanFile(path);
+	}
+
+	/**
+	 * Creates target file if it does not exist, cleans it if it exists.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 */
+	protected void cleanFile(String path) {
+		try {
+			PrintWriter writer;
+			writer = new PrintWriter(path);
+			writer.print("");
+			writer.close();
+		} catch (FileNotFoundException e) {
+			throw new RuntimeException("File not found " + path, e);
+		}
+	}
+
+	/**
+	 * Condition for writing the contents of tupleList and clearing it.
+	 * 
+	 * @return value of the updating condition
+	 */
+	protected abstract boolean updateCondition();
+
+	/**
+	 * Statements to be executed after writing a batch goes here.
+	 */
+	protected abstract void resetParameters();
+
+	/**
+	 * Implementation of the invoke method of the SinkFunction class. Collects
+	 * the incoming tuples in tupleList and appends the list to the end of the
+	 * target file if updateCondition() is true or the current tuple is the
+	 * endTuple.
+	 */
+	@Override
+	public void invoke(IN tuple) {
+		if (!tuple.equals(endTuple)) {
+			tupleList.add(tuple);
+			if (updateCondition()) {
+				format.write(path, tupleList);
+				resetParameters();
+			}
+		} else {
+			format.write(path, tupleList);
+			resetParameters();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
new file mode 100644
index 0000000..3797d13
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Implementation of WriteSinkFunction. Writes tuples to file in equally sized
+ * batches.
+ *
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteSinkFunctionByBatches<IN extends Tuple> extends WriteSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private final int batchSize;
+
+	public WriteSinkFunctionByBatches(String path, WriteFormat<IN> format, int batchSize,
+			IN endTuple) {
+		super(path, format, endTuple);
+		this.batchSize = batchSize;
+	}
+
+	@Override
+	protected boolean updateCondition() {
+		return tupleList.size() >= batchSize;
+	}
+
+	@Override
+	protected void resetParameters() {
+		tupleList.clear();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
new file mode 100644
index 0000000..cb77e6d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -0,0 +1,54 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Implementation of WriteSinkFunction. Writes tuples to file in every millis
+ * milliseconds.
+ *
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteSinkFunctionByMillis<IN extends Tuple> extends WriteSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private final long millis;
+	private long lastTime;
+
+	public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis, IN endTuple) {
+		super(path, format, endTuple);
+		this.millis = millis;
+		lastTime = System.currentTimeMillis();
+	}
+
+	@Override
+	protected boolean updateCondition() {
+		return System.currentTimeMillis() - lastTime >= millis;
+	}
+
+	@Override
+	protected void resetParameters() {
+		tupleList.clear();
+		lastTime = System.currentTimeMillis();
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
new file mode 100644
index 0000000..f016cbc
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.util.Collector;
+
+public class FileSourceFunction extends SourceFunction<Tuple1<String>> {
+	private static final long serialVersionUID = 1L;
+	
+	private final String path;
+	private Tuple1<String> outTuple = new Tuple1<String>();
+	
+	public FileSourceFunction(String path) {
+		this.path = path;
+	}
+	
+	@Override
+	public void invoke(Collector<Tuple1<String>> collector) throws IOException {
+		BufferedReader br = new BufferedReader(new FileReader(path));
+		String line = br.readLine();
+		while (line != null) {
+			if (line != "") {
+				outTuple.f0 = line;
+				collector.collect(outTuple);
+			}
+			line = br.readLine();
+		}
+		br.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
new file mode 100644
index 0000000..c3ccedf
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -0,0 +1,54 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.util.Collector;
+
+public class FileStreamFunction extends SourceFunction<Tuple1<String>>{
+	private static final long serialVersionUID = 1L;
+	
+	private final String path;
+	private Tuple1<String> outTuple = new Tuple1<String>();
+	
+	public FileStreamFunction(String path) {
+		this.path = path;
+	}
+	
+	@Override
+	public void invoke(Collector<Tuple1<String>> collector) throws IOException {
+		while(true){
+			BufferedReader br = new BufferedReader(new FileReader(path));
+			String line = br.readLine();
+			while (line != null) {
+				if (line != "") {
+					outTuple.f0 = line;
+					collector.collect(outTuple);
+				}
+				line = br.readLine();
+			}
+			br.close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
new file mode 100755
index 0000000..dfe29d2
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.util.Collector;
+
+public class FromElementsFunction<T> extends SourceFunction<Tuple1<T>> {
+	private static final long serialVersionUID = 1L;
+
+	Iterable<T> iterable;
+	Tuple1<T> outTuple = new Tuple1<T>();
+
+	public FromElementsFunction(T... elements) {
+		this.iterable = Arrays.asList(elements);
+	}
+
+	public FromElementsFunction(Collection<T> elements) {
+		this.iterable = elements;
+	}
+
+	@Override
+	public void invoke(Collector<Tuple1<T>> collector) throws Exception {
+		for (T element : iterable) {
+			outTuple.f0 = element;
+			collector.collect(outTuple);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
new file mode 100755
index 0000000..706295e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.util.Collector;
+
+/**
+ * Source Function used to generate the number sequence
+ * 
+ */
+public class GenSequenceFunction extends SourceFunction<Tuple1<Long>> {
+
+	private static final long serialVersionUID = 1L;
+
+	long from;
+	long to;
+	Tuple1<Long> outTuple = new Tuple1<Long>();
+
+	public GenSequenceFunction(long from, long to) {
+		this.from = from;
+		this.to = to;
+	}
+
+	@Override
+	public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
+		for (long i = from; i <= to; i++) {
+			outTuple.f0 = i;
+			collector.collect(outTuple);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
new file mode 100755
index 0000000..70553bf
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public abstract class SourceFunction<OUT extends Tuple> extends UserSourceInvokable<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 2ba9da5..cb16307 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,9 +19,8 @@
 
 package org.apache.flink.streaming.api.invokable;
 
-import org.apache.flink.streaming.api.function.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index d1c4e9b..731589b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.collector.StreamCollector;
-import org.apache.flink.streaming.api.function.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
 import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
 import org.apache.flink.streaming.api.invokable.UserSourceInvokable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 4e6e223..8ba58c5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -26,7 +26,7 @@ import java.util.List;
 
 import org.apache.flink.api.java.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.function.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.flink.util.Collector;
 import org.apache.log4j.Level;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index c5cdd98..e296733 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -1,3 +1,22 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertTrue;
@@ -10,7 +29,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.function.SourceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/60f632a3/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index 52a5cc7..fc0efcf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -1,4 +1,4 @@
-/***********************************************************************************************************************
+/**
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  *
- **********************************************************************************************************************/
+ */
 
 package org.apache.flink.streaming.api;
 
@@ -29,7 +29,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.function.SourceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 


Mime
View raw message