flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/3] flink git commit: [FLINK-1625] [streaming] [api-breaking] Added proper cancellation to StreamInvokables + Sink- and SourceFunction interfaces extended with cancel method
Date Wed, 04 Mar 2015 21:53:15 GMT
[FLINK-1625] [streaming] [api-breaking] Added proper cancellation to StreamInvokables + Sink- and SourceFunction interfaces extended with cancel method


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8436e9ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8436e9ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8436e9ce

Branch: refs/heads/master
Commit: 8436e9ce31b52f1bd8c55b8e8c50cafb57cff84f
Parents: 3abd6c8
Author: Gyula Fora <gyfora@apache.org>
Authored: Wed Mar 4 10:24:00 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Wed Mar 4 22:38:41 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/flume/FlumeSink.java   |   7 ++
 .../streaming/connectors/flume/FlumeSource.java |   6 +-
 .../connectors/kafka/KafkaProducerExample.java  |   8 +-
 .../connectors/kafka/api/KafkaSink.java         |  37 ++++---
 .../connectors/kafka/api/KafkaSource.java       |  32 ++++--
 .../streaming/connectors/rabbitmq/RMQSink.java  |   5 +
 .../connectors/rabbitmq/RMQSource.java          |  42 ++++---
 .../connectors/twitter/TwitterSource.java       |  35 +++---
 .../connectors/twitter/TwitterStreaming.java    |   4 +
 .../api/function/sink/FileSinkFunction.java     |   9 ++
 .../api/function/sink/PrintSinkFunction.java    |   8 +-
 .../api/function/sink/SinkFunction.java         |   4 +-
 .../sink/WriteSinkFunctionByMillis.java         |   5 +
 .../function/source/FileMonitoringFunction.java |  18 ++-
 .../api/function/source/FileSourceFunction.java |  28 +++--
 .../function/source/FromElementsFunction.java   |   6 +-
 .../function/source/GenSequenceFunction.java    |   6 +-
 .../source/SocketTextStreamFunction.java        | 110 ++++++++++++-------
 .../api/function/source/SourceFunction.java     |   6 +-
 .../streaming/api/invokable/SinkInvokable.java  |   8 +-
 .../api/invokable/SourceInvokable.java          |   8 +-
 .../api/invokable/StreamInvokable.java          |  12 +-
 .../invokable/operator/CounterInvokable.java    |   6 +-
 .../api/invokable/operator/FilterInvokable.java |   8 +-
 .../invokable/operator/FlatMapInvokable.java    |   8 +-
 .../api/invokable/operator/MapInvokable.java    |   8 +-
 .../invokable/operator/ProjectInvokable.java    |   2 +-
 .../operator/StreamReduceInvokable.java         |   8 +-
 .../api/invokable/operator/co/CoInvokable.java  |  17 ++-
 .../windowing/GroupedStreamDiscretizer.java     |   7 +-
 .../windowing/GroupedWindowBufferInvokable.java |   8 +-
 .../operator/windowing/StreamDiscretizer.java   |   2 +-
 .../windowing/WindowBufferInvokable.java        |  11 +-
 .../operator/windowing/WindowFlattener.java     |   8 +-
 .../operator/windowing/WindowMerger.java        |   8 +-
 .../operator/windowing/WindowPartitioner.java   |   8 +-
 .../api/streamvertex/StreamVertex.java          |   7 ++
 .../apache/flink/streaming/api/IterateTest.java |   4 +
 .../flink/streaming/api/OutputSplitterTest.java |  16 +++
 .../streaming/api/WindowCrossJoinTest.java      |   8 ++
 .../api/collector/DirectedOutputTest.java       |   4 +
 .../windowing/WindowIntegrationTest.java        |  28 +++++
 .../api/streamvertex/StreamVertexTest.java      |  15 ++-
 .../apache/flink/streaming/util/MockSource.java |   2 +-
 .../streaming/examples/join/WindowJoin.java     |  14 ++-
 .../ml/IncrementalLearningSkeleton.java         |  14 ++-
 .../examples/windowing/StockPrices.java         |  14 ++-
 .../windowing/TopSpeedWindowingExample.java     |   6 +-
 .../flink/streaming/api/scala/DataStream.scala  |   1 +
 .../api/scala/StreamExecutionEnvironment.scala  |   3 +-
 .../test/classloading/jar/StreamingProgram.java |  10 +-
 51 files changed, 481 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 8a2f2b8..86fd1b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -134,6 +134,13 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	}
 
 	@Override
+	public void cancel() {
+		if (client != null) {
+			client.client.close();
+		}
+	}
+
+	@Override
 	public void open(Configuration config) {
 		client = new FlinkRpcClientFacade();
 		client.init(host, port);

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 4f6ec2d..2a321a2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -130,7 +130,7 @@ public class FlumeSource<OUT> extends ConnectorSource<OUT> {
 	 *            The Collector for sending data to the datastream
 	 */
 	@Override
-	public void invoke(Collector<OUT> collector) throws Exception {
+	public void run(Collector<OUT> collector) throws Exception {
 		configureAvroSource(collector);
 		avroSource.start();
 		while (!finished) {
@@ -138,4 +138,8 @@ public class FlumeSource<OUT> extends ConnectorSource<OUT> {
 		}
 	}
 
+	@Override
+	public void cancel() {
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 2c2bf80..1cd1192 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -41,7 +41,7 @@ public class KafkaProducerExample {
 		@SuppressWarnings({ "unused", "serial" })
 		DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
 			@Override
-			public void invoke(Collector<String> collector) throws Exception {
+			public void run(Collector<String> collector) throws Exception {
 				for (int i = 0; i < 100; i++) {
 					collector.collect("message #" + i);
 					Thread.sleep(100L);
@@ -49,6 +49,12 @@ public class KafkaProducerExample {
 
 				collector.collect(new String("q"));
 			}
+
+			@Override
+			public void cancel() {				
+			}
+			
+			
 		}).addSink(
 				new KafkaSink<String>(topic, host + ":" + port, new JavaDefaultStringSchema())
 		)

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 5324480..d14772b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -33,9 +33,9 @@ import org.apache.flink.streaming.connectors.util.SerializationSchema;
 
 /**
  * Sink that emits its inputs to a Kafka topic.
- *
+ * 
  * @param <IN>
- * 		Type of the sink input
+ *            Type of the sink input
  */
 public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
@@ -49,14 +49,15 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	private KafkaPartitioner<IN> partitioner;
 
 	/**
-	 * Creates a KafkaSink for a given topic. The partitioner distributes the messages between the partitions of the topics.
-	 *
+	 * Creates a KafkaSink for a given topic. The partitioner distributes the
+	 * messages between the partitions of the topics.
+	 * 
 	 * @param topicId
-	 * 		ID of the Kafka topic.
+	 *            ID of the Kafka topic.
 	 * @param brokerAddr
-	 * 		Address of the Kafka broker (with port number).
+	 *            Address of the Kafka broker (with port number).
 	 * @param serializationSchema
-	 * 		User defined serialization schema.
+	 *            User defined serialization schema.
 	 */
 	public KafkaSink(String topicId, String brokerAddr,
 			SerializationSchema<IN, byte[]> serializationSchema) {
@@ -64,16 +65,17 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	}
 
 	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input into the topic.
-	 *
+	 * Creates a KafkaSink for a given topic. The sink produces its input into
+	 * the topic.
+	 * 
 	 * @param topicId
-	 * 		ID of the Kafka topic.
+	 *            ID of the Kafka topic.
 	 * @param brokerAddr
-	 * 		Address of the Kafka broker (with port number).
+	 *            Address of the Kafka broker (with port number).
 	 * @param serializationSchema
-	 * 		User defined serialization schema.
+	 *            User defined serialization schema.
 	 * @param partitioner
-	 * 		User defined partitioner.
+	 *            User defined partitioner.
 	 */
 	public KafkaSink(String topicId, String brokerAddr,
 			SerializationSchema<IN, byte[]> serializationSchema, KafkaPartitioner<IN> partitioner) {
@@ -111,9 +113,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 
 	/**
 	 * Called when new data arrives to the sink, and forwards it to Kafka.
-	 *
+	 * 
 	 * @param next
-	 * 		The incoming data
+	 *            The incoming data
 	 */
 	@Override
 	public void invoke(IN next) {
@@ -132,4 +134,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 		}
 	}
 
+	@Override
+	public void cancel() {
+		close();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 7a185bb..f4097e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -53,6 +53,8 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	private long zookeeperSyncTimeMillis;
 	private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
 
+	private volatile boolean isRunning = false;
+
 	/**
 	 * Creates a KafkaSource that consumes a topic.
 	 * 
@@ -107,21 +109,31 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	 *            The Collector for sending data to the dataStream
 	 */
 	@Override
-	public void invoke(Collector<OUT> collector) throws Exception {
-
-		while (consumerIterator.hasNext()) {
-			OUT out = schema.deserialize(consumerIterator.next().message());
-			if (schema.isEndOfStream(out)) {
-				break;
+	public void run(Collector<OUT> collector) throws Exception {
+		isRunning = true;
+		try {
+			while (isRunning && consumerIterator.hasNext()) {
+				OUT out = schema.deserialize(consumerIterator.next().message());
+				if (schema.isEndOfStream(out)) {
+					break;
+				}
+				collector.collect(out);
 			}
-			collector.collect(out);
+		} finally {
+			consumer.shutdown();
 		}
-		consumer.shutdown();
-
 	}
 
 	@Override
-	public void open(Configuration config) {
+	public void open(Configuration config) throws Exception {
 		initializeConnection();
 	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+		if (consumer != null) {
+			consumer.shutdown();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 38c4f5f..dae9c6d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -108,4 +108,9 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		closeChannel();
 	}
 
+	@Override
+	public void cancel() {
+		close();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 7ce864e..12ad3d6 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -46,6 +46,8 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
 	private transient QueueingConsumer consumer;
 	private transient QueueingConsumer.Delivery delivery;
 
+	private volatile boolean isRunning = false;
+
 	OUT out;
 
 	public RMQSource(String HOST_NAME, String QUEUE_NAME,
@@ -80,42 +82,46 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
 	 *            The Collector for sending data to the dataStream
 	 */
 	@Override
-	public void invoke(Collector<OUT> collector) throws Exception {
-
-		while (true) {
-
-			try {
-				delivery = consumer.nextDelivery();
-			} catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+	public void run(Collector<OUT> collector) throws Exception {
+		isRunning = true;
+		try {
+			while (isRunning) {
+
+				try {
+					delivery = consumer.nextDelivery();
+				} catch (Exception e) {
+					if (LOG.isErrorEnabled()) {
+						LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+					}
 				}
-			}
 
-			out = schema.deserialize(delivery.getBody());
-			if (schema.isEndOfStream(out)) {
-				break;
-			} else {
-				collector.collect(out);
+				out = schema.deserialize(delivery.getBody());
+				if (schema.isEndOfStream(out)) {
+					break;
+				} else {
+					collector.collect(out);
+				}
 			}
+		} finally {
+			connection.close();
 		}
 
 	}
 
 	@Override
-	public void open(Configuration config) {
+	public void open(Configuration config) throws Exception {
 		initializeConnection();
 	}
 
 	@Override
-	public void close() {
+	public void cancel() {
+		isRunning = false;
 		try {
 			connection.close();
 		} catch (IOException e) {
 			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
 					+ " at " + HOST_NAME, e);
 		}
-
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index ddb2538..740907f 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -57,6 +57,8 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	private boolean streaming;
 	private int numberOfTweets;
 
+	private volatile boolean isRunning = false;
+
 	/**
 	 * Create {@link TwitterSource} for streaming
 	 * 
@@ -90,20 +92,20 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	}
 
 	@Override
-	public void invoke(Collector<String> collector) throws Exception {
-
-		if (streaming) {
-			collectMessages(collector);
-		} else {
-			collectFiniteMessages(collector);
+	public void run(Collector<String> collector) throws Exception {
+		isRunning = true;
+		try {
+			if (streaming) {
+				collectMessages(collector);
+			} else {
+				collectFiniteMessages(collector);
+			}
+		} finally {
+			closeConnection();
+			isRunning = false;
 		}
 	}
 
-	@Override
-	public void close() throws Exception {
-		closeConnection();
-	}
-
 	/**
 	 * Initialize Hosebird Client to be able to consume Twitter's Streaming API
 	 */
@@ -196,7 +198,7 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 			LOG.info("Tweet-stream begins");
 		}
 
-		while (true) {
+		while (isRunning) {
 			collectOneMessage(collector);
 		}
 	}
@@ -246,7 +248,8 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	/**
 	 * Get the size of the queue in which the tweets are contained temporarily.
 	 * 
-	 * @return the size of the queue in which the tweets are contained temporarily
+	 * @return the size of the queue in which the tweets are contained
+	 *         temporarily
 	 */
 	public int getQueueSize() {
 		return queueSize;
@@ -280,4 +283,10 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	public void setWaitSec(int waitSec) {
 		this.waitSec = waitSec;
 	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+		closeConnection();
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index a32fe1b..9be27eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -47,6 +47,10 @@ public class TwitterStreaming {
 			System.out.println("");
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	public static class SelectDataFlatMap extends

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
index 24beba1..5468494 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
@@ -115,4 +115,13 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
 	 */
 	protected abstract void resetParameters();
 
+	@Override
+	public void cancel() {
+		try {
+			close();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index d460749..0fa37ac 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -84,10 +84,9 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
 	}
 	
 	@Override
-	public void close() throws Exception {
+	public void close() {
 		this.stream = null;
 		this.prefix = null;
-		super.close();
 	}
 	
 	@Override
@@ -95,4 +94,9 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
 		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
 	}
 
+	@Override
+	public void cancel() {
+		close();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 6097603..05ae34d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.functions.Function;
 
 public interface SinkFunction<IN> extends Function, Serializable {
 
-	public abstract void invoke(IN value) throws Exception;
+	public void invoke(IN value) throws Exception;
+
+	public void cancel();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index ee6df94..53030f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -47,4 +47,9 @@ public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
 		lastTime = System.currentTimeMillis();
 	}
 
+	@Override
+	public void cancel() {
+		// No cleanup needed
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
index 05a2489..2a84c0e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
@@ -39,8 +39,10 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 
 	public enum WatchType {
 		ONLY_NEW_FILES, // Only new files will be processed.
-		REPROCESS_WITH_APPENDED, // When some files are appended, all contents of the files will be processed.
-		PROCESS_ONLY_APPENDED // When some files are appended, only appended contents will be processed.
+		REPROCESS_WITH_APPENDED, // When some files are appended, all contents
+									// of the files will be processed.
+		PROCESS_ONLY_APPENDED // When some files are appended, only appended
+								// contents will be processed.
 	}
 
 	private String path;
@@ -51,6 +53,8 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 	private Map<String, Long> offsetOfFiles;
 	private Map<String, Long> modificationTimes;
 
+	private volatile boolean isRunning = false;
+
 	public FileMonitoringFunction(String path, long interval, WatchType watchType) {
 		this.path = path;
 		this.interval = interval;
@@ -60,10 +64,11 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 	}
 
 	@Override
-	public void invoke(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
+	public void run(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
+		isRunning = true;
 		fileSystem = FileSystem.get(new URI(path));
 
-		while (true) {
+		while (isRunning) {
 			List<String> files = listNewFiles();
 			for (String filePath : files) {
 				if (watchType == WatchType.ONLY_NEW_FILES
@@ -120,4 +125,9 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 			}
 		}
 	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index dcf67a9..d7df266 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -38,6 +38,8 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
 
 	private TypeInformation<String> typeInfo;
 
+	private volatile boolean isRunning;
+
 	public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
 		this.inputFormat = format;
 		this.typeInfo = typeInfo;
@@ -51,33 +53,32 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
 	}
 
 	@Override
-	public void invoke(Collector<String> collector) throws Exception {
+	public void run(Collector<String> collector) throws Exception {
+		isRunning = true;
 		final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext()
 				.getExecutionConfig());
 		final Iterator<InputSplit> splitIterator = getInputSplits();
 		@SuppressWarnings("unchecked")
 		final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
 		try {
-			while (splitIterator.hasNext()) {
+			while (isRunning && splitIterator.hasNext()) {
 
 				final InputSplit split = splitIterator.next();
 				String record = serializer.createInstance();
 
 				format.open(split);
-				try {
-					while (!format.reachedEnd()) {
-						if ((record = format.nextRecord(record)) != null) {
-							collector.collect(record);
-						}
+				while (!format.reachedEnd()) {
+					if ((record = format.nextRecord(record)) != null) {
+						collector.collect(record);
 					}
-				} finally {
-					format.close();
 				}
+
 			}
 			collector.close();
-		} catch (Exception ex) {
-			ex.printStackTrace();
+		} finally {
+			format.close();
 		}
+		isRunning = false;
 	}
 
 	private Iterator<InputSplit> getInputSplits() {
@@ -126,4 +127,9 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
 			}
 		};
 	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index 8afac75..97a3a92 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -40,10 +40,14 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 	}
 
 	@Override
-	public void invoke(Collector<T> collector) throws Exception {
+	public void run(Collector<T> collector) throws Exception {
 		for (T element : iterable) {
 			collector.collect(element);
 		}
 	}
 
+	@Override
+	public void cancel() {
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index 3afd06e..eccc146 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -37,7 +37,7 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
 	}
 
 	@Override
-	public void invoke(Collector<Long> collector) throws Exception {
+	public void run(Collector<Long> collector) throws Exception {
 		while (splitIterator.hasNext()) {
 			collector.collect(splitIterator.next());
 		}
@@ -50,4 +50,8 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
 		splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
 	}
 
+	@Override
+	public void cancel() {
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
index 3253c01..67bc128 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -18,10 +18,12 @@
 package org.apache.flink.streaming.api.function.source;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
@@ -33,7 +35,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	protected static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
 
 	private static final long serialVersionUID = 1L;
-	
+
 	private String hostname;
 	private int port;
 	private char delimiter;
@@ -43,6 +45,8 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	private static final int CONNECTION_TIMEOUT_TIME = 0;
 	private static final int CONNECTION_RETRY_SLEEP = 1000;
 
+	private volatile boolean isRunning = false;
+
 	public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
 		this.hostname = hostname;
 		this.port = port;
@@ -55,65 +59,91 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		socket = new Socket();
-		
+
 		socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
 	}
-	
+
 	@Override
-	public void invoke(Collector<String> collector) throws Exception {
+	public void run(Collector<String> collector) throws Exception {
 		streamFromSocket(collector, socket);
 	}
 
 	public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
-		StringBuffer buffer = new StringBuffer();
-		BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+		isRunning = true;
+		try {
+			StringBuffer buffer = new StringBuffer();
+			BufferedReader reader = new BufferedReader(new InputStreamReader(
+					socket.getInputStream()));
 
-		while (true) {
-			int data = reader.read();
-			if (data == -1) {
-				socket.close();
-				long retry = 0;
-				boolean success = false;
-				while (retry < maxRetry && !success) {
-					if (!retryForever) {
-						retry++;
+			while (isRunning) {
+				int data;
+				try {
+					data = reader.read();
+				} catch (SocketException e) {
+					if (!isRunning) {
+						break;
+					} else {
+						throw e;
 					}
-					LOG.warn("Lost connection to server socket. Retrying in " + (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
-					try {
-						socket = new Socket();
-						socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
-						success = true;
-					} catch (ConnectException ce) {
-						Thread.sleep(CONNECTION_RETRY_SLEEP);
+				}
+
+				if (data == -1) {
+					socket.close();
+					long retry = 0;
+					boolean success = false;
+					while (retry < maxRetry && !success) {
+						if (!retryForever) {
+							retry++;
+						}
+						LOG.warn("Lost connection to server socket. Retrying in "
+								+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
+						try {
+							socket = new Socket();
+							socket.connect(new InetSocketAddress(hostname, port),
+									CONNECTION_TIMEOUT_TIME);
+							success = true;
+						} catch (ConnectException ce) {
+							Thread.sleep(CONNECTION_RETRY_SLEEP);
+						}
 					}
+
+					if (success) {
+						LOG.info("Server socket is reconnected.");
+					} else {
+						LOG.error("Could not reconnect to server socket.");
+						break;
+					}
+					reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+					continue;
 				}
 
-				if (success) {
-					LOG.info("Server socket is reconnected.");
-				} else {
-					LOG.error("Could not reconnect to server socket.");
-					break;
+				if (data == delimiter) {
+					collector.collect(buffer.toString());
+					buffer = new StringBuffer();
+				} else if (data != '\r') { // ignore carriage return
+					buffer.append((char) data);
 				}
-				reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-				continue;
 			}
 
-			if (data == delimiter) {
+			if (buffer.length() > 0) {
 				collector.collect(buffer.toString());
-				buffer = new StringBuffer();
-			} else if (data != '\r') { // ignore carriage return
-				buffer.append((char) data);
 			}
-		}
-
-		if (buffer.length() > 0) {
-			collector.collect(buffer.toString());
+		} finally {
+			socket.close();
 		}
 	}
 
 	@Override
-	public void close() throws Exception {
-		socket.close();
-		super.close();
+	public void cancel() {
+		isRunning = false;
+		if (socket != null && !socket.isClosed()) {
+			try {
+				socket.close();
+			} catch (IOException e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Could not close open socket");
+				}
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 917562a..4f579fe 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -24,6 +24,8 @@ import org.apache.flink.util.Collector;
 
 public interface SourceFunction<OUT> extends Function, Serializable {
 
-	public void invoke(Collector<OUT> collector) throws Exception;
+	public void run(Collector<OUT> collector) throws Exception;
+	
+	public void cancel();
 		
-}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 13a6ba1..35060fd 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -31,7 +31,7 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -47,4 +47,10 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
 		callUserFunctionAndLogException();
 	}
 
+	@Override
+	public void cancel() {
+		super.cancel();
+		sinkFunction.cancel();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index f1cf2c5..c3f25a0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -39,6 +39,12 @@ public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements S
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		sourceFunction.invoke(collector);
+		sourceFunction.run(collector);
+	}
+
+	@Override
+	public void cancel() {
+		super.cancel();
+		sourceFunction.cancel();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index db7b642..85fb9a4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -107,7 +107,13 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 			}
 			return nextRecord;
 		} catch (IOException e) {
-			throw new RuntimeException("Could not read next record.");
+			if (isRunning) {
+				throw new RuntimeException("Could not read next record due to: "
+						+ StringUtils.stringifyException(e));
+			} else {
+				// Task already cancelled do nothing
+				return null;
+			}
 		}
 	}
 
@@ -159,6 +165,10 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 		}
 	}
 
+	public void cancel() {
+		isRunning = false;
+	}
+
 	public void setRuntimeContext(RuntimeContext t) {
 		FunctionUtils.setFunctionRuntimeContext(userFunction, t);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
index 3fc314c..8bb546c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -30,14 +30,16 @@ public class CounterInvokable<IN> extends ChainableInvokable<IN, Long> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			collector.collect(++count);
 		}
 	}
 
 	@Override
 	public void collect(IN record) {
-		collector.collect(++count);
+		if (isRunning) {
+			collector.collect(++count);
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 0c8298e..ab3f147 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -34,7 +34,7 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -49,7 +49,9 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void collect(IN record) {
-		nextObject = copy(record);
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 2a4081b..025bd32 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -32,7 +32,7 @@ public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -44,8 +44,10 @@ public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 
 	@Override
 	public void collect(IN record) {
-		nextObject = copy(record);
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 7c8e577..8fc1f13 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -32,7 +32,7 @@ public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -44,7 +44,9 @@ public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 
 	@Override
 	public void collect(IN record) {
-		nextObject = copy(record);
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 31689c7..3e47107 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -41,7 +41,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index fe6c41a..e7fa2b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -35,7 +35,7 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			reduce();
 		}
 	}
@@ -62,8 +62,10 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void collect(IN record) {
-		nextObject = copy(record);
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 604873e..b41dbbb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
+import java.io.IOException;
+
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -76,8 +78,19 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
 
 	@Override
 	public void invoke() throws Exception {
-		while (true) {
-			int next = recordIterator.next(reuse1, reuse2);
+		while (isRunning) {
+			int next;
+			try {
+				next = recordIterator.next(reuse1, reuse2);
+			} catch (IOException e) {
+				if (isRunning) {
+					throw e;
+				} else {
+					// Task already cancelled do nothing
+					next = 0;
+				}
+			}
+
 			if (next == 0) {
 				break;
 			} else if (next == 1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
index 5e21a31..f14a6ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -60,11 +60,8 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 
 	@Override
 	public void invoke() throws Exception {
-		if (readNext() == null) {
-			throw new RuntimeException("DataStream must not be empty");
-		}
 
-		while (nextRecord != null) {
+		while (isRunning && readNext() != null) {
 
 			Object key = keySelector.getKey(nextObject);
 
@@ -76,8 +73,6 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 			}
 
 			groupDiscretizer.processRealElement(nextObject);
-
-			readNext();
 		}
 
 		for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
index 53c87c3..2c3bd75 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
@@ -42,7 +42,7 @@ public class GroupedWindowBufferInvokable<T> extends WindowBufferInvokable<T> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -64,8 +64,10 @@ public class GroupedWindowBufferInvokable<T> extends WindowBufferInvokable<T> {
 
 	@Override
 	public void collect(WindowEvent<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index 104196e..e668b66 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -71,7 +71,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
 	public void invoke() throws Exception {
 
 		// Continuously run
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			processRealElement(nextObject);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index ea4b830..75f7d9d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -26,8 +26,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
  * This invokable flattens the results of the window transformations by
  * outputing the elements of the {@link StreamWindow} one-by-one
  */
-public class WindowBufferInvokable<T> extends
-		ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
+public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
 
 	protected WindowBuffer<T> buffer;
 
@@ -40,7 +39,7 @@ public class WindowBufferInvokable<T> extends
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -67,8 +66,10 @@ public class WindowBufferInvokable<T> extends
 
 	@Override
 	public void collect(WindowEvent<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
index edefeef..0ff4724 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
@@ -34,7 +34,7 @@ public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -48,8 +48,10 @@ public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
 
 	@Override
 	public void collect(StreamWindow<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
index a58bb9f..f425255 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
@@ -41,7 +41,7 @@ public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamW
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -69,7 +69,9 @@ public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamW
 
 	@Override
 	public void collect(StreamWindow<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
index e010af4..846650d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
@@ -44,7 +44,7 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -71,8 +71,10 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
 
 	@Override
 	public void collect(StreamWindow<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index bd25e72..99ca098 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -154,6 +154,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	}
 
 	@Override
+	public void cancel() {
+		if (userInvokable != null) {
+			userInvokable.cancel();
+		}
+	}
+
+	@Override
 	public StreamConfig getConfig() {
 		return configuration;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 6ad827a..92d23aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -69,6 +69,10 @@ public class IterateTest {
 		public void invoke(Boolean tuple) {
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index 2486715..a214fbf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -76,6 +76,10 @@ public class OutputSplitterTest {
 			public void invoke(Integer value) {
 				splitterResult1.add(value);
 			}
+			
+			@Override
+			public void cancel() {
+			}
 		});
 
 		d1.split(new OutputSelector<Integer>() {
@@ -98,6 +102,10 @@ public class OutputSplitterTest {
 			public void invoke(Integer value) {
 				splitterResult2.add(value);
 			}
+			
+			@Override
+			public void cancel() {
+			}
 		});
 		env.execute();
 
@@ -144,6 +152,10 @@ public class OutputSplitterTest {
 			public void invoke(Integer value) {
 				splitterResult1.add(value);
 			}
+			
+			@Override
+			public void cancel() {
+			}
 		});
 
 		ds.split(new OutputSelector<Integer>() {
@@ -168,6 +180,10 @@ public class OutputSplitterTest {
 			public void invoke(Integer value) {
 				splitterResult2.add(value);
 			}
+			
+			@Override
+			public void cancel() {
+			}
 		});
 		env.execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index dc4932e..e14e281 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -143,6 +143,10 @@ public class WindowCrossJoinTest implements Serializable {
 		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
 			joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
 		}
+		
+		@Override
+		public void cancel() {
+		}
 	}
 
 	private static class CrossResultSink implements
@@ -153,5 +157,9 @@ public class WindowCrossJoinTest implements Serializable {
 		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
 			crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
 		}
+		
+		@Override
+		public void cancel() {
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 38bba5e..9d166e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -88,6 +88,10 @@ public class DirectedOutputTest {
 			outputs.put(name, new ArrayList<Long>());
 			this.list = outputs.get(name);
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 
 	private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 3163c46..2ed0002 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -208,6 +208,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -221,6 +225,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -234,6 +242,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -247,6 +259,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -260,6 +276,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -273,6 +293,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -286,5 +310,9 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index 4f01a8b..18a36ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -50,12 +50,17 @@ public class StreamVertexTest {
 		private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
 
 		@Override
-		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+		public void run(Collector<Tuple1<Integer>> collector) throws Exception {
 			for (int i = 0; i < 10; i++) {
 				tuple.f0 = i;
 				collector.collect(tuple);
 			}
 		}
+
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
@@ -77,6 +82,10 @@ public class StreamVertexTest {
 			Integer v = tuple.getField(1);
 			data.put(k, v);
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 
 	@SuppressWarnings("unused")
@@ -142,6 +151,10 @@ public class StreamVertexTest {
 		public void invoke(String value) {
 			result.add(value);
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
index bb92e8e..4cf02ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
@@ -27,7 +27,7 @@ public class MockSource<T> {
 	public static <T> List<T> createAndExecute(SourceFunction<T> source) {
 		List<T> outputs = new ArrayList<T>();
 		try {
-			source.invoke(new MockCollector<T>(outputs));
+			source.run(new MockCollector<T>(outputs));
 		} catch (Exception e) {
 			throw new RuntimeException("Cannot invoke source.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index dcfed50..a5a9577 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -108,7 +108,7 @@ public class WindowJoin {
 		}
 
 		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+		public void run(Collector<Tuple2<String, Integer>> out) throws Exception {
 			while (true) {
 				outTuple.f0 = names[rand.nextInt(names.length)];
 				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
@@ -116,6 +116,11 @@ public class WindowJoin {
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
 			}
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	/**
@@ -134,7 +139,7 @@ public class WindowJoin {
 		}
 
 		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+		public void run(Collector<Tuple2<String, Integer>> out) throws Exception {
 			while (true) {
 				outTuple.f0 = names[rand.nextInt(names.length)];
 				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
@@ -142,6 +147,11 @@ public class WindowJoin {
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
 			}
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	public static class MyJoinFunction

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 4cdb7c6..26895f2 100755
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -93,7 +93,7 @@ public class IncrementalLearningSkeleton {
 		private static final int NEW_DATA_SLEEP_TIME = 1000;
 
 		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
+		public void run(Collector<Integer> collector) throws Exception {
 			while (true) {
 				collector.collect(getNewData());
 			}
@@ -103,6 +103,11 @@ public class IncrementalLearningSkeleton {
 			Thread.sleep(NEW_DATA_SLEEP_TIME);
 			return 1;
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	/**
@@ -114,7 +119,7 @@ public class IncrementalLearningSkeleton {
 		private static final int TRAINING_DATA_SLEEP_TIME = 10;
 
 		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
+		public void run(Collector<Integer> collector) throws Exception {
 			while (true) {
 				collector.collect(getTrainingData());
 			}
@@ -126,6 +131,11 @@ public class IncrementalLearningSkeleton {
 			return 1;
 
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index 9bf851e..ec99026 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -267,7 +267,7 @@ public class StockPrices {
 		}
 
 		@Override
-		public void invoke(Collector<StockPrice> collector) throws Exception {
+		public void run(Collector<StockPrice> collector) throws Exception {
 			price = DEFAULT_PRICE;
 			Random random = new Random();
 
@@ -277,6 +277,11 @@ public class StockPrices {
 				Thread.sleep(random.nextInt(200));
 			}
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	public final static class WindowMean implements WindowMapFunction<StockPrice, StockPrice> {
@@ -307,7 +312,7 @@ public class StockPrices {
 		StringBuilder stringBuilder;
 
 		@Override
-		public void invoke(Collector<String> collector) throws Exception {
+		public void run(Collector<String> collector) throws Exception {
 			random = new Random();
 			stringBuilder = new StringBuilder();
 
@@ -322,6 +327,11 @@ public class StockPrices {
 			}
 
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	public static final class SendWarning implements WindowMapFunction<StockPrice, String> {

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index 5e73fd6..311c6b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -87,7 +87,7 @@ public class TopSpeedWindowingExample {
 		}
 
 		@Override
-		public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
+		public void run(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
 				throws Exception {
 
 			while (true) {
@@ -104,6 +104,10 @@ public class TopSpeedWindowingExample {
 				}
 			}
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 
 	private static int numOfCars = 2;

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3ab4ff1..d4df1d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -568,6 +568,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
     val sinkFunction = new SinkFunction[T] {
       val cleanFun = clean(fun)
       def invoke(in: T) = cleanFun(in)
+      def cancel() = {}
     }
     this.addSink(sinkFunction)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 00d3704..1212b2b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -224,9 +224,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     Validate.notNull(function, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
       val cleanFun = StreamExecutionEnvironment.clean(function)
-      override def invoke(out: Collector[T]) {
+      override def run(out: Collector[T]) {
         cleanFun(out)
       }
+      override def cancel() = {}
     }
     addSource(sourceFunction)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index f7f9eae..18b52c5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -18,17 +18,15 @@
 
 package org.apache.flink.test.classloading.jar;
 
+import java.util.StringTokenizer;
+
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.util.Collector;
 
-import java.util.StringTokenizer;
-
 @SuppressWarnings("serial")
 public class StreamingProgram {
 	
@@ -100,5 +98,9 @@ public class StreamingProgram {
 		@Override
 		public void invoke(Word value) throws Exception {
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 }


Mime
View raw message