flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-2160] Change Streaming Source Interface to run(Context)/cancel()
Date Fri, 05 Jun 2015 13:34:53 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4b9e34047 -> 235b02cb5


[FLINK-2160] Change Streaming Source Interface to run(Context)/cancel()

The context can be used to emit elements and retrieve the checkpointing
lock object.

In the future, the context can be extended to provide support for
element emission with timestamps and dealing with the watermark system.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/235b02cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/235b02cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/235b02cb

Branch: refs/heads/master
Commit: 235b02cb56d24bb5db3c254fa34661ccc0e1a083
Parents: 4b9e340
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Jun 4 11:29:17 2015 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Jun 5 15:34:20 2015 +0200

----------------------------------------------------------------------
 .../hbase/example/HBaseWriteStreamExample.java  |  2 +-
 .../connectors/kafka/KafkaProducerExample.java  |  7 ++---
 .../connectors/kafka/api/KafkaSource.java       |  5 ++--
 .../api/persistent/PersistentKafkaSource.java   |  9 +++---
 .../streaming/connectors/kafka/KafkaITCase.java | 31 ++++++++++----------
 .../connectors/rabbitmq/RMQSource.java          |  5 ++--
 .../connectors/twitter/TwitterSource.java       |  5 ++--
 .../source/FileMonitoringFunction.java          |  7 ++---
 .../functions/source/FileSourceFunction.java    |  5 ++--
 .../functions/source/FromElementsFunction.java  |  6 ++--
 .../functions/source/FromIteratorFunction.java  |  6 ++--
 .../source/FromSplittableIteratorFunction.java  |  5 ++--
 .../source/SocketTextStreamFunction.java        | 11 ++++---
 .../api/functions/source/SourceFunction.java    | 29 +++++++++++++++---
 .../streaming/api/operators/StreamSource.java   | 16 ++++++++--
 .../api/ChainedRuntimeContextTest.java          |  3 +-
 .../flink/streaming/api/TypeFillTest.java       |  2 +-
 .../api/complex/ComplexIntegrationTest.java     | 15 ++++------
 .../windowing/WindowIntegrationTest.java        |  8 ++---
 .../api/streamtask/StreamVertexTest.java        |  5 ++--
 .../runtime/tasks/SourceStreamTaskTest.java     |  6 ++--
 .../apache/flink/streaming/util/MockSource.java | 16 ++++++++--
 .../examples/iteration/IterateExample.java      |  5 ++--
 .../streaming/examples/join/WindowJoin.java     |  9 +++---
 .../ml/IncrementalLearningSkeleton.java         | 12 ++++----
 .../examples/windowing/SessionWindowing.java    |  6 ++--
 .../windowing/TopSpeedWindowingExample.java     |  6 ++--
 .../api/scala/StreamExecutionEnvironment.scala  |  7 +++--
 .../StreamCheckpointingITCase.java              |  7 +++--
 .../ProcessFailureStreamingRecoveryITCase.java  | 12 ++++----
 30 files changed, 147 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
index 74c6c57..d1a61d3 100644
--- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
+++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -53,7 +53,7 @@ public class HBaseWriteStreamExample {
 			private volatile boolean isRunning = true;
 
 			@Override
-			public void run(Object checkpointLock, Collector<String> out) throws Exception {
+			public void run(SourceContext<String> out) throws Exception {
 				while (isRunning) {
 					out.collect(String.valueOf(Math.floor(Math.random() * 100)));
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 4d98b1b..f241d1c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -21,7 +21,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.util.Collector;
 
 @SuppressWarnings("serial")
 public class KafkaProducerExample {
@@ -44,13 +43,13 @@ public class KafkaProducerExample {
 			private volatile boolean running = true;
 			
 			@Override
-			public void run(Object checkpointLock, Collector<String> collector) throws Exception {
+			public void run(SourceContext<String> ctx) throws Exception {
 				for (int i = 0; i < 20 && running; i++) {
-					collector.collect("message #" + i);
+					ctx.collect("message #" + i);
 					Thread.sleep(100L);
 				}
 
-				collector.collect("q");
+				ctx.collect("q");
 			}
 
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index a4c56e4..3bcbfa7 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -32,7 +32,6 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.ConnectorSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,7 +185,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<OUT> collector) throws Exception {
+	public void run(SourceContext<OUT> ctx) throws Exception {
 		
 		// NOTE: Since this source is not checkpointed, we do not need to
 		// acquire the checkpoint lock
@@ -196,7 +195,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 				if (schema.isEndOfStream(out)) {
 					break;
 				}
-				collector.collect(out);
+				ctx.collect(out);
 			}
 		} finally {
 			consumer.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 5d77a1a..bda6076 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Collector;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,10 +168,12 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<OUT> collector) throws Exception {
+	public void run(SourceContext<OUT> ctx) throws Exception {
 		if (iteratorToRead == null) {
 			throw new IllegalStateException("Kafka iterator not initialized properly.");
 		}
+
+		final Object checkpointLock = ctx.getCheckpointLock();
 		
 		while (running && iteratorToRead.hasNext()) {
 			MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
@@ -190,7 +191,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 			// make the state update and the element emission atomic
 			synchronized (checkpointLock) {
 				lastOffsets[message.partition()] = message.offset();
-				collector.collect(next);
+				ctx.collect(next);
 			}
 
 			if (LOG.isTraceEnabled()) {
@@ -378,4 +379,4 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index d695b09..0a12b07 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -399,13 +399,14 @@ public class KafkaITCase {
 			boolean running = true;
 
 			@Override
-			public void run(Object checkpointLock, Collector<Tuple2<Integer, Integer>> collector) throws Exception {
+			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
 				LOG.info("Starting source.");
 				int cnt = from;
 				int partition = getRuntimeContext().getIndexOfThisSubtask();
 				while (running) {
 					LOG.info("Writing " + cnt + " to partition " + partition);
-					collector.collect(new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
+					ctx.collect(new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(),
+							cnt));
 					if (cnt == to) {
 						LOG.info("Writer reached end.");
 						return;
@@ -492,11 +493,11 @@ public class KafkaITCase {
 			boolean running = true;
 
 			@Override
-			public void run(Object checkpointLock, Collector<Tuple2<Long, String>> collector) throws Exception {
+			public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
 				LOG.info("Starting source.");
 				int cnt = 0;
 				while (running) {
-					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
 					try {
 						Thread.sleep(100);
 					} catch (InterruptedException ignored) {
@@ -576,11 +577,11 @@ public class KafkaITCase {
 			boolean running = true;
 
 			@Override
-			public void run(Object checkpointLock, Collector<Tuple2<Long, String>> collector) throws Exception {
+			public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
 				LOG.info("Starting source.");
 				int cnt = 0;
 				while (running) {
-					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
 					LOG.info("Produced " + cnt);
 
 					try {
@@ -668,14 +669,14 @@ public class KafkaITCase {
 			}
 
 			@Override
-			public void run(Object checkpointLock, Collector<Tuple2<Long, byte[]>> collector) throws Exception {
+			public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
 				LOG.info("Starting source.");
 				long cnt = 0;
 				Random rnd = new Random(1337);
 				while (running) {
 					//
 					byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
-					collector.collect(new Tuple2<Long, byte[]>(cnt++, wl));
+					ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
 					LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length);
 
 					try {
@@ -685,7 +686,7 @@ public class KafkaITCase {
 					if(cnt == 10) {
 						LOG.info("Send end signal");
 						// signal end
-						collector.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+						ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
 						running = false;
 					}
 				}
@@ -776,11 +777,11 @@ public class KafkaITCase {
 			boolean running = true;
 
 			@Override
-			public void run(Object checkpointLock, Collector<Tuple2<Long, String>> collector) throws Exception {
+			public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
 				LOG.info("Starting source.");
 				int cnt = 0;
 				while (running) {
-					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
 					try {
 						Thread.sleep(100);
 					} catch (InterruptedException ignored) {
@@ -869,11 +870,11 @@ public class KafkaITCase {
 			boolean running = true;
 
 			@Override
-			public void run(Object checkpointLock, Collector<String> collector) throws Exception {
+			public void run(SourceContext<String> ctx) throws Exception {
 				LOG.info("Starting source.");
 				int cnt = 0;
 				while (running) {
-					collector.collect("kafka-" + cnt++);
+					ctx.collect("kafka-" + cnt++);
 					try {
 						Thread.sleep(100);
 					} catch (InterruptedException ignored) {
@@ -911,12 +912,12 @@ public class KafkaITCase {
 			boolean running = true;
 
 			@Override
-			public void run(Object checkpointLock, Collector<String> collector) throws Exception {
+			public void run(SourceContext<String> ctx) throws Exception {
 				LOG.info("Starting source.");
 				int cnt = 0;
 				while (running) {
 					String msg = "kafka-" + cnt++;
-					collector.collect(msg);
+					ctx.collect(msg);
 					LOG.info("sending message = "+msg);
 
 					if ((cnt - 1) % 20 == 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 93812c2..b18b8d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -27,7 +27,6 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
-import org.apache.flink.util.Collector;
 
 public class RMQSource<OUT> extends ConnectorSource<OUT> {
 	private static final long serialVersionUID = 1L;
@@ -86,7 +85,7 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<OUT> out) throws Exception {
+	public void run(SourceContext<OUT> ctx) throws Exception {
 		while (running) {
 			delivery = consumer.nextDelivery();
 
@@ -95,7 +94,7 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
 				break;
 			}
 
-			out.collect(result);
+			ctx.collect(result);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index a1a6d9b..bad0f8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -26,7 +26,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,7 +208,7 @@ public class TwitterSource extends RichSourceFunction<String> {
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<String> out) throws Exception {
+	public void run(SourceContext<String> ctx) throws Exception {
 		while (isRunning) {
 			if (client.isDone()) {
 				if (LOG.isErrorEnabled()) {
@@ -219,7 +218,7 @@ public class TwitterSource extends RichSourceFunction<String> {
 				break;
 			}
 
-			out.collect(queue.take());
+			ctx.collect(queue.take());
 
 			if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets) {
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index a9166c1..2c85650 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +62,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
+	public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception {
 		FileSystem fileSystem = FileSystem.get(new URI(path));
 
 		while (isRunning) {
@@ -71,7 +70,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 			for (String filePath : files) {
 				if (watchType == WatchType.ONLY_NEW_FILES
 						|| watchType == WatchType.REPROCESS_WITH_APPENDED) {
-					collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
+					ctx.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
 					offsetOfFiles.put(filePath, -1L);
 				} else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
 					long offset = 0;
@@ -80,7 +79,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 						offset = offsetOfFiles.get(filePath);
 					}
 
-					collector.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
+					ctx.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
 					offsetOfFiles.put(filePath, fileSize);
 
 					LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index ebbff9c..cf08e5a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -116,7 +115,7 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<OUT> out) throws Exception {
+	public void run(SourceContext<OUT> ctx) throws Exception {
 		while (isRunning) {
 			OUT nextElement = serializer.createInstance();
 			nextElement =  format.nextRecord(nextElement);
@@ -126,7 +125,7 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
 			} else if (nextElement == null) {
 				break;
 			}
-			out.collect(nextElement);
+			ctx.collect(nextElement);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 838cfa4..736cc73 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -21,8 +21,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 
-import org.apache.flink.util.Collector;
-
 public class FromElementsFunction<T> implements SourceFunction<T> {
 	
 	private static final long serialVersionUID = 1L;
@@ -44,11 +42,11 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<T> out) throws Exception {
+	public void run(SourceContext<T> ctx) throws Exception {
 		Iterator<T> it = iterable.iterator();
 
 		while (isRunning && it.hasNext()) {
-			out.collect(it.next());
+			ctx.collect(it.next());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
index 7320d77..655710e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import org.apache.flink.util.Collector;
-
 import java.util.Iterator;
 
 public class FromIteratorFunction<T> implements SourceFunction<T> {
@@ -34,9 +32,9 @@ public class FromIteratorFunction<T> implements SourceFunction<T> {
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<T> out) throws Exception {
+	public void run(SourceContext<T> ctx) throws Exception {
 		while (isRunning && iterator.hasNext()) {
-			out.collect(iterator.next());
+			ctx.collect(iterator.next());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
index 61e1b7f..bc78e4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 
 import java.util.Iterator;
@@ -46,9 +45,9 @@ public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunctio
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<T> out) throws Exception {
+	public void run(SourceContext<T> ctx) throws Exception {
 		while (isRunning && iterator.hasNext()) {
-			out.collect(iterator.next());
+			ctx.collect(iterator.next());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index c5800f0..fb66f16 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -26,7 +26,6 @@ import java.net.Socket;
 import java.net.SocketException;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,11 +63,11 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	}
 
 	@Override
-	public void run(Object checkpointLock, Collector<String> collector) throws Exception {
-		streamFromSocket(collector, socket);
+	public void run(SourceContext<String> ctx) throws Exception {
+		streamFromSocket(ctx, socket);
 	}
 
-	public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
+	public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
 		try {
 			StringBuffer buffer = new StringBuffer();
 			BufferedReader reader = new BufferedReader(new InputStreamReader(
@@ -117,7 +116,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 				}
 
 				if (data == delimiter) {
-					collector.collect(buffer.toString());
+					ctx.collect(buffer.toString());
 					buffer = new StringBuffer();
 				} else if (data != '\r') { // ignore carriage return
 					buffer.append((char) data);
@@ -125,7 +124,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 			}
 
 			if (buffer.length() > 0) {
-				collector.collect(buffer.toString());
+				ctx.collect(buffer.toString());
 			}
 		} finally {
 			socket.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 8c349e9..4c6ec1a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
@@ -84,10 +83,9 @@ public interface SourceFunction<T> extends Function, Serializable {
 	 * elements. Also, the update of state and emission of elements must happen in the same
 	 * synchronized block.
 	 *
-	 * @param checkpointLock The object to synchronize on when updating state and emitting elements.
-	 * @param out The collector to use for emitting elements
+	 * @param ctx The context for interaction with the outside world.
 	 */
-	void run(final Object checkpointLock, Collector<T> out) throws Exception;
+	void run(SourceContext<T> ctx) throws Exception;
 
 	/**
 	 * Cancels the source. Most sources will have a while loop inside the
@@ -96,4 +94,27 @@ public interface SourceFunction<T> extends Function, Serializable {
 	 * is set to false in this method.
 	 */
 	void cancel();
+
+	/**
+	 * Interface that source functions use to communicate with the outside world. Normally
+	 * sources would just emit elements in a loop using {@link #collect}. If the source is a
+	 * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} source it must retrieve
+	 * the checkpoint lock object and use it to protect state updates and element emission as
+	 * described in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
+	 *
+	 * @param <T> The type of the elements produced by the source.
+	 */
+	public static interface SourceContext<T> {
+
+		/**
+		 * Emits one element from the source.
+		 */
+		public void collect(T element);
+
+		/**
+		 * Returns the checkpoint lock. Please refer to the explanation about checkpointed sources
+		 * in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
+		 */
+		public Object getCheckpointLock();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index e63349a..907f93a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -33,8 +33,20 @@ public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunc
 		this.chainingStrategy = ChainingStrategy.HEAD;
 	}
 
-	public void run(Object lockingObject, Collector<OUT> collector) throws Exception {
-		userFunction.run(lockingObject, collector);
+	public void run(final Object lockingObject, final Collector<OUT> collector) throws Exception {
+		SourceFunction.SourceContext<OUT> ctx = new SourceFunction.SourceContext<OUT>() {
+			@Override
+			public void collect(OUT element) {
+				collector.collect(element);
+			}
+
+			@Override
+			public Object getCheckpointLock() {
+				return lockingObject;
+			}
+		};
+
+		userFunction.run(ctx);
 	}
 
 	public void cancel() {

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
index fef8a31..3ad6b8e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -47,7 +46,7 @@ public class ChainedRuntimeContextTest {
 	private static class TestSource extends RichParallelSourceFunction<Integer> {
 
 		@Override
-		public void run(Object checkpointLock, Collector<Integer> out) throws Exception {
+		public void run(SourceContext<Integer> ctx) throws Exception {
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 1dbbc00..399297b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -118,7 +118,7 @@ public class TypeFillTest {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void run(Object checkpointLock, Collector<T> out) throws Exception {
+		public void run(SourceContext<T> ctx) throws Exception {
 
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 2858658..809668d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -484,11 +484,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		long cnt = 0;
 
 		@Override
-		public void run(Object checkpointLock,
-				Collector<OuterPojo> out) throws Exception {
+		public void run(SourceContext<OuterPojo> ctx) throws Exception {
 			for (int i = 0; i < 20; i++) {
 				OuterPojo result = new OuterPojo(new InnerPojo(cnt / 2, "water_melon-b"), 2L);
-				out.collect(result);
+				ctx.collect(result);
 			}
 		}
 
@@ -502,11 +501,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void run(Object checkpointLock,
-				Collector<Tuple2<Long, Tuple2<String, Long>>> out) throws Exception {
+		public void run(SourceContext<Tuple2<Long, Tuple2<String, Long>>> ctx) throws Exception {
 			for (int i = 0; i < 20; i++) {
 				Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(1L, new Tuple2<String, Long>("a", 1L));
-				out.collect(result);
+				ctx.collect(result);
 			}
 		}
 
@@ -622,10 +620,9 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public void run(Object checkpointLock,
-				Collector<Rectangle> out) throws Exception {
+		public void run(SourceContext<Rectangle> ctx) throws Exception {
 			for (int i = 0; i < 100; i++) {
-				out.collect(rectangle);
+				ctx.collect(rectangle);
 				rectangle = rectangle.next();
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
index ec97984..ec8cda8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
@@ -162,9 +162,9 @@ public class WindowIntegrationTest implements Serializable {
 			private static final long serialVersionUID = 1L;
 
 			@Override
-			public void run(Object checkpointLock, Collector<Integer> out) throws Exception {
+			public void run(SourceContext<Integer> ctx) throws Exception {
 				for (int i = 1; i <= 10; i++) {
-					out.collect(i);
+					ctx.collect(i);
 				}
 			}
 
@@ -189,9 +189,9 @@ public class WindowIntegrationTest implements Serializable {
 			}
 
 			@Override
-			public void run(Object checkpointLock, Collector<Integer> out) throws Exception {
+			public void run(SourceContext<Integer> ctx) throws Exception {
 				for (;i < 11; i += 2) {
-					out.collect(i);
+					ctx.collect(i);
 				}
 
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index 55f4add..9085034 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -37,7 +37,6 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 public class StreamVertexTest {
@@ -52,10 +51,10 @@ public class StreamVertexTest {
 		private int i = 0;
 
 		@Override
-		public void run(Object checkpointLock, Collector<Tuple1<Integer>> out) throws Exception {
+		public void run(SourceContext<Tuple1<Integer>> ctx) throws Exception {
 			for (int i = 0; i < 10; i++) {
 				tuple.f0 = i;
-				out.collect(tuple);
+				ctx.collect(tuple);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index ef44053..c745e6c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -169,7 +168,8 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 		}
 
 		@Override
-		public void run(final Object lockObject, Collector<Tuple2<Long, Integer>> out) {
+		public void run(SourceContext<Tuple2<Long, Integer>> ctx) {
+			final Object lockObject = ctx.getCheckpointLock();
 			while (isRunning && count < maxElements) {
 				// simulate some work
 				try {
@@ -179,7 +179,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 				}
 
 				synchronized (lockObject) {
-					out.collect(new Tuple2<Long, Integer>(lastCheckpointId, count));
+					ctx.collect(new Tuple2<Long, Integer>(lastCheckpointId, count));
 					count++;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
index 77600f3..5bf3b61 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
@@ -33,8 +33,20 @@ public class MockSource<T> {
 			((RichSourceFunction<T>) sourceFunction).open(new Configuration());
 		}
 		try {
-			Collector<T> collector = new MockOutput<T>(outputs);
-			sourceFunction.run(new Object(), collector);
+			final Collector<T> collector = new MockOutput<T>(outputs);
+			final Object lockObject = new Object();
+			SourceFunction.SourceContext<T> ctx = new SourceFunction.SourceContext<T>() {
+				@Override
+				public void collect(T element) {
+					collector.collect(element);
+				}
+
+				@Override
+				public Object getCheckpointLock() {
+					return lockObject;
+				}
+			};
+			sourceFunction.run(ctx);
 		} catch (Exception e) {
 			throw new RuntimeException("Cannot invoke source.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 2682942..78d361d 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -114,13 +113,13 @@ public class IterateExample {
 		private volatile boolean isRunning = true;
 
 		@Override
-		public void run(Object checkpointLock, Collector<Tuple2<Integer, Integer>> collector) throws Exception {
+		public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
 
 			while (isRunning) {
 				int first = rnd.nextInt(BOUND / 2 - 1) + 1;
 				int second = rnd.nextInt(BOUND / 2 - 1) + 1;
 
-				collector.collect(new Tuple2<Integer, Integer>(first, second));
+				ctx.collect(new Tuple2<Integer, Integer>(first, second));
 				Thread.sleep(500L);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 8d7e5de..0fec4c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.util.Collector;
 
 import java.util.Random;
 
@@ -112,12 +111,12 @@ public class WindowJoin {
 		}
 
 		@Override
-		public void run(Object checkpointLock, Collector<Tuple2<String, Integer>> out) throws Exception {
+		public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
 			while (isRunning) {
 				outTuple.f0 = names[rand.nextInt(names.length)];
 				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-				out.collect(outTuple);
+				ctx.collect(outTuple);
 			}
 		}
 
@@ -146,12 +145,12 @@ public class WindowJoin {
 
 
 		@Override
-		public void run(Object checkpointLock, Collector<Tuple2<String, Integer>> out) throws Exception {
+		public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
 			while (isRunning) {
 				outTuple.f0 = names[rand.nextInt(names.length)];
 				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-				out.collect(outTuple);
+				ctx.collect(outTuple);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 1b30f59..48111f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -97,9 +97,9 @@ public class IncrementalLearningSkeleton {
 		private volatile boolean isRunning = true;
 
 		@Override
-		public void run(Object checkpointLock, Collector<Integer> collector) throws Exception {
+		public void run(SourceContext<Integer> ctx) throws Exception {
 			while (isRunning) {
-				collector.collect(getNewData());
+				ctx.collect(getNewData());
 			}
 		}
 
@@ -123,10 +123,10 @@ public class IncrementalLearningSkeleton {
 		private int counter;
 
 		@Override
-		public void run(Object checkpointLock, Collector<Integer> collector) throws Exception {
+		public void run(SourceContext<Integer> ctx) throws Exception {
 			Thread.sleep(15);
 			while (counter < 50) {
-				collector.collect(getNewData());
+				ctx.collect(getNewData());
 			}
 		}
 
@@ -153,7 +153,7 @@ public class IncrementalLearningSkeleton {
 		private volatile boolean isRunning = true;
 
 		@Override
-		public void run(Object checkpointLock, Collector<Integer> collector) throws Exception {
+		public void run(SourceContext<Integer> collector) throws Exception {
 			while (isRunning) {
 				collector.collect(getTrainingData());
 			}
@@ -181,7 +181,7 @@ public class IncrementalLearningSkeleton {
 		private int counter = 0;
 
 		@Override
-		public void run(Object checkpointLock, Collector<Integer> collector) throws Exception {
+		public void run(SourceContext<Integer> collector) throws Exception {
 			while (counter < 8200) {
 				collector.collect(getTrainingData());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 2dd9378..f8d8652 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -58,13 +57,12 @@ public class SessionWindowing {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void run(Object checkpointLock, Collector<Tuple3<String, Long, Integer>> collector)
-							throws Exception {
+					public void run(SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
 						for (Tuple3<String, Long, Integer> value : input) {
 							// We sleep three seconds between every output so we
 							// can see whether we properly detect sessions
 							// before the next start for a specific id
-							collector.collect(value);
+							ctx.collect(value);
 							if (!fileOutput) {
 								System.out.println("Collected: " + value);
 								Thread.sleep(3000);

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index 850e30d..657ce2a 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.util.Collector;
 
 import java.util.Arrays;
 import java.util.Random;
@@ -108,8 +107,7 @@ public class TopSpeedWindowingExample {
 		}
 
 		@Override
-		public void run(Object checkpointLock, Collector<Tuple4<Integer, Integer, Double, Long>> collector)
-				throws Exception {
+		public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception {
 
 			while (isRunning) {
 				Thread.sleep(1000);
@@ -122,7 +120,7 @@ public class TopSpeedWindowingExample {
 					distances[carId] += speeds[carId] / 3.6d;
 					Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId,
 							speeds[carId], distances[carId], System.currentTimeMillis());
-					collector.collect(record);
+					ctx.collect(record);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index b2ccf8c..008ad6c 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.StateHandleProvider
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.{Collector, SplittableIterator}
@@ -405,12 +406,12 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * source functionality.
    *
    */
-  def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = {
+  def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
     require(function != null, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
       val cleanFun = StreamExecutionEnvironment.clean(function)
-      override def run(lockObject: AnyRef, out: Collector[T]) {
-        cleanFun(out)
+      override def run(ctx: SourceContext[T]) {
+        cleanFun(ctx)
       }
       override def cancel() = {}
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 64e1f24..f0eef9d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
-import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -219,7 +218,9 @@ public class StreamCheckpointingITCase {
 		}
 
 		@Override
-		public void run(Object lockingObject, Collector out) throws Exception {
+		public void run(SourceContext<String> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
 			while (isRunning && index < numElements) {
 				char first = (char) ((index % 40) + 40);
 
@@ -230,7 +231,7 @@ public class StreamCheckpointingITCase {
 
 				synchronized (lockingObject) {
 					index += step;
-					out.collect(result);
+					ctx.collect(result);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 0b99e04..626b1d1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
 import org.junit.Assert;
 
 /**
@@ -124,12 +123,13 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		}
 
 		@Override
-		public void run(Object checkpointLock, Collector<Long> collector) throws Exception {
+		public void run(SourceContext<Long> sourceCtx) throws Exception {
+			final Object checkpointLock = sourceCtx.getCheckpointLock();
 
-			StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+			StreamingRuntimeContext runtimeCtx = (StreamingRuntimeContext) getRuntimeContext();
 
-			final long stepSize = context.getNumberOfParallelSubtasks();
-			final long congruence = context.getIndexOfThisSubtask();
+			final long stepSize = runtimeCtx.getNumberOfParallelSubtasks();
+			final long congruence = runtimeCtx.getIndexOfThisSubtask();
 			final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
 
 			final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
@@ -148,7 +148,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 				}
 
 				synchronized (checkpointLock) {
-					collector.collect(collected * stepSize + congruence);
+					sourceCtx.collect(collected * stepSize + congruence);
 					collected++;
 				}
 			}


Mime
View raw message