flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [4/4] flink git commit: [FLINK-1763] [streaming] Remove cancel from SinkFunction
Date Sun, 22 Mar 2015 17:35:28 GMT
[FLINK-1763] [streaming] Remove cancel from SinkFunction

This closes #513


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35f34162
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35f34162
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35f34162

Branch: refs/heads/master
Commit: 35f34162a1ba3bf3fd590d7b0e59011ad9f6160f
Parents: 2842e2f
Author: Gyula Fora <gyfora@apache.org>
Authored: Sun Mar 22 13:44:27 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Mar 22 16:44:09 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/flume/FlumeSink.java   |  7 ----
 .../connectors/kafka/api/KafkaSink.java         |  6 +--
 .../connectors/kafka/api/KafkaSource.java       |  1 +
 .../api/simple/offset/BeginningOffset.java      |  2 +
 .../kafka/api/simple/offset/CurrentOffset.java  |  2 +
 .../kafka/api/simple/offset/GivenOffset.java    |  1 +
 .../kafka/api/simple/offset/KafkaOffset.java    |  2 +
 .../streaming/connectors/rabbitmq/RMQSink.java  |  5 ---
 .../connectors/twitter/TwitterStreaming.java    |  4 --
 .../streaming/connectors/kafka/KafkaITCase.java |  7 ++--
 .../api/function/sink/FileSinkFunction.java     |  9 ----
 .../api/function/sink/PrintSinkFunction.java    |  6 ---
 .../api/function/sink/SinkFunction.java         |  7 ----
 .../api/function/sink/SocketClientSink.java     |  8 ----
 .../sink/WriteSinkFunctionByMillis.java         |  5 ---
 .../streaming/api/invokable/SinkInvokable.java  |  7 ----
 .../api/streamvertex/StreamVertex.java          | 43 +++++++++++++++-----
 .../streaming/io/SpillingBufferOrEvent.java     |  1 +
 .../apache/flink/streaming/api/IterateTest.java |  5 ---
 .../api/collector/DirectedOutputTest.java       |  6 +--
 .../windowing/WindowIntegrationTest.java        | 40 ------------------
 .../api/streamvertex/StreamVertexTest.java      | 10 +----
 .../streaming/util/TestListResultSink.java      |  6 ---
 .../flink/streaming/api/scala/DataStream.scala  |  1 -
 .../StreamCheckpointingITCase.java              |  4 +-
 .../test/classloading/jar/StreamingProgram.java |  4 --
 26 files changed, 50 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 27074ec..8112159 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -134,13 +134,6 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	}
 
 	@Override
-	public void cancel() {
-		if (client != null) {
-			client.client.close();
-		}
-	}
-
-	@Override
 	public void open(Configuration config) {
 		client = new FlinkRpcClientFacade();
 		client.init(host, port);

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index be9eb57..f1dbc8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -65,6 +65,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 * @param serializationSchema
 	 * 		User defined serialization schema.
 	 */
+	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public KafkaSink(String zookeeperAddress, String topicId,
 			SerializationSchema<IN, byte[]> serializationSchema) {
 		this(zookeeperAddress, topicId, serializationSchema, (Class) null);
@@ -161,9 +162,4 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 		}
 	}
 
-	@Override
-	public void cancel() {
-		close();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index ae6c169..1aa834d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -46,6 +46,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
+	@SuppressWarnings("unused")
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
 
 	private final String zookeeperAddress;

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
index f7096ad..15e7b36 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
@@ -22,6 +22,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
 
 public class BeginningOffset extends KafkaOffset {
 
+	private static final long serialVersionUID = 1L;
+
 	@Override
 	public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName)
{
 		return getLastOffset(consumer, topic, partition, OffsetRequest.EarliestTime(), clientName);

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
index 3555ff9..6119f32 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
@@ -22,6 +22,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
 
 public class CurrentOffset extends KafkaOffset {
 
+	private static final long serialVersionUID = 1L;
+
 	@Override
 	public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName)
{
 		return getLastOffset(consumer, topic, partition, OffsetRequest.LatestTime(), clientName);

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
index 1282125..fef6325 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
@@ -21,6 +21,7 @@ import kafka.javaapi.consumer.SimpleConsumer;
 
 public class GivenOffset extends KafkaOffset {
 
+	private static final long serialVersionUID = 1L;
 	private final long offset;
 
 	public GivenOffset(long offset) {

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
index c048ba1..4dfd314 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
@@ -28,6 +28,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
 
 public abstract class KafkaOffset implements Serializable {
 
+	private static final long serialVersionUID = 1L;
+
 	public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
 			String clientName);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 48f5e60..53db1c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -108,9 +108,4 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		closeChannel();
 	}
 
-	@Override
-	public void cancel() {
-		close();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index 9be27eb..a32fe1b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -47,10 +47,6 @@ public class TwitterStreaming {
 			System.out.println("");
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	public static class SelectDataFlatMap extends

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index ab786ee..a094b89 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -154,14 +154,11 @@ public class KafkaITCase {
 					throw new SuccessException();
 				}
 			}
-
-			@Override
-			public void cancel() {
-			}
 		});
 
 		// add producing topology
 		DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+			private static final long serialVersionUID = 1L;
 			boolean running = true;
 
 			@Override
@@ -247,6 +244,8 @@ public class KafkaITCase {
 
 	public static class SuccessException extends Exception {
 
+		private static final long serialVersionUID = 1L;
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
index 5468494..24beba1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
@@ -115,13 +115,4 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN>
{
 	 */
 	protected abstract void resetParameters();
 
-	@Override
-	public void cancel() {
-		try {
-			close();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index 0fa37ac..9ff8a7f 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -93,10 +93,4 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN>
{
 	public String toString() {
 		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
 	}
-
-	@Override
-	public void cancel() {
-		close();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index ffa5a67..d4ce24e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -35,11 +35,4 @@ public interface SinkFunction<IN> extends Function, Serializable
{
 	 * @throws Exception
 	 */
 	public void invoke(IN value) throws Exception;
-
-	/**
-	 * In case another vertex in topology fails this method is called before terminating
-	 * the sink. Make sure to free up any allocated resources here.
-	 */
-	public void cancel();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
index 6ebcf46..c582c1b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
@@ -127,12 +127,4 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN>
{
 		closeConnection();
 	}
 
-	/**
-	 * Closes the connection with the Socket server.
-	 */
-	@Override
-	public void cancel() {
-		close();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index 53030f4..ee6df94 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -47,9 +47,4 @@ public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN>
{
 		lastTime = System.currentTimeMillis();
 	}
 
-	@Override
-	public void cancel() {
-		// No cleanup needed
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 01a295a..29d2ed2 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -40,11 +40,4 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN,
IN> {
 	protected void callUserFunction() throws Exception {
 		sinkFunction.invoke(nextObject);
 	}
-
-	@Override
-	public void cancel() {
-		super.cancel();
-		sinkFunction.cancel();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index dd8a463..ae2ebdd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -29,9 +29,9 @@ import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.jobmanager.BarrierAck;
 import org.apache.flink.runtime.jobmanager.StateBarrierAck;
 import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -152,32 +152,36 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements
StreamTa
 	@Override
 	public void invoke() throws Exception {
 
+		boolean operatorOpen = false;
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID());
 		}
 
 		try {
 			userInvokable.setRuntimeContext(context);
-			userInvokable.open(getTaskConfiguration());
 
-			for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-				invokable.setRuntimeContext(context);
-				invokable.open(getTaskConfiguration());
-			}
+			operatorOpen = true;
+			openOperator();
 
 			userInvokable.invoke();
 
-			userInvokable.close();
-
-			for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-				invokable.close();
-			}
+			closeOperator();
+			operatorOpen = false;
 
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID());
 			}
 
 		} catch (Exception e) {
+
+			if (operatorOpen) {
+				try {
+					closeOperator();
+				} catch (Throwable t) {
+				}
+			}
+
 			if (LOG.isErrorEnabled()) {
 				LOG.error("StreamInvokable failed due to: {}", StringUtils.stringifyException(e));
 			}
@@ -190,6 +194,23 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements
StreamTa
 
 	}
 
+	protected void openOperator() throws Exception {
+		userInvokable.open(getTaskConfiguration());
+
+		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+			invokable.setRuntimeContext(context);
+			invokable.open(getTaskConfiguration());
+		}
+	}
+
+	protected void closeOperator() throws Exception {
+		userInvokable.close();
+
+		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+			invokable.close();
+		}
+	}
+
 	protected void clearBuffers() throws IOException {
 		if (outputHandler != null) {
 			outputHandler.clearWriters();

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
index 40713e2..38cc0c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
@@ -40,6 +40,7 @@ public class SpillingBufferOrEvent {
 			throws IOException {
 
 		this.boe = boe;
+		this.channelIndex = boe.getChannelIndex();
 		this.spillReader = reader;
 
 		if (shouldSpill()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 92d23aa..3f0c48a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -68,11 +68,6 @@ public class IterateTest {
 		@Override
 		public void invoke(Boolean tuple) {
 		}
-
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 62872fd..bc7fe73 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -88,9 +88,6 @@ public class DirectedOutputTest {
 			this.list = outputs.get(name);
 		}
 
-		@Override
-		public void cancel() {
-		}
 	}
 
 	private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
@@ -115,6 +112,7 @@ public class DirectedOutputTest {
 		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
 		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
 				evenAndOddSink.getSortedResult());
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), allSink.getSortedResult());
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
+				allSink.getSortedResult());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 34986c8..f0b5500 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -286,10 +286,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -303,10 +299,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -320,10 +312,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -337,10 +325,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -354,10 +338,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -371,10 +351,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -388,10 +364,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -405,10 +377,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -422,10 +390,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -439,10 +403,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index a88a60d..0c3ff83 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -83,9 +83,6 @@ public class StreamVertexTest {
 			data.put(k, v);
 		}
 
-		@Override
-		public void cancel() {
-		}
 	}
 
 	@SuppressWarnings("unused")
@@ -134,13 +131,13 @@ public class StreamVertexTest {
 
 		@Override
 		public String map1(String value) {
-//			System.out.println(value);
+			// System.out.println(value);
 			return value;
 		}
 
 		@Override
 		public String map2(Long value) {
-//			System.out.println(value);
+			// System.out.println(value);
 			return value.toString();
 		}
 	}
@@ -154,9 +151,6 @@ public class StreamVertexTest {
 			result.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
index 8b78a42..87f290f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.util;
 
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TreeSet;
@@ -72,9 +71,4 @@ public class TestListResultSink<T> extends RichSinkFunction<T>
{
 			return sortedList;
 		}
 	}
-
-	@Override
-	public void cancel() {
-
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 7d77b5b..59f86b8 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -634,7 +634,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
     val sinkFunction = new SinkFunction[T] {
       val cleanFun = clean(fun)
       def invoke(in: T) = cleanFun(in)
-      def cancel() = {}
     }
     this.addSink(sinkFunction)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 3accb11..12c6b41 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -178,9 +178,7 @@ public class StreamCheckpointingITCase {
 							}
 						}
 
-						@Override
-						public void cancel() {}
-					});
+			});
 
 			env.execute();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 18b52c5..9a244a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -98,9 +98,5 @@ public class StreamingProgram {
 		@Override
 		public void invoke(Word value) throws Exception {
 		}
-
-		@Override
-		public void cancel() {
-		}
 	}
 }


Mime
View raw message