flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [4/4] flink git commit: [FLINK-3896] Allow a StreamTask to be Externally Cancelled
Date Tue, 14 Jun 2016 16:12:38 GMT
[FLINK-3896] Allow a StreamTask to be Externally Cancelled

It adds a method failExternally() to the StreamTask, so that custom Operators
can make their containing task fail when needed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc19486c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc19486c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc19486c

Branch: refs/heads/master
Commit: bc19486ccfc4164d6abd9c712db8e92a350c5a85
Parents: fdf4360
Author: kl0u <kkloudas@gmail.com>
Authored: Tue May 10 18:56:58 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Jun 14 18:11:22 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/execution/Environment.java  | 11 +++++++++++
 .../runtime/taskmanager/RuntimeEnvironment.java      |  9 +++++++++
 .../org/apache/flink/runtime/taskmanager/Task.java   |  4 ++--
 .../operators/testutils/DummyEnvironment.java        |  5 +++++
 .../runtime/operators/testutils/MockEnvironment.java |  5 +++++
 .../flink/streaming/runtime/tasks/StreamTask.java    | 15 ++++++++++++++-
 .../runtime/tasks/StreamMockEnvironment.java         |  5 +++++
 7 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 121936c..9f779ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -166,6 +166,17 @@ public interface Environment {
 	 */
 	void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
 
+	/**
+	 * Marks task execution failed for an external reason (a reason other than the task code
itself
+	 * throwing an exception). If the task is already in a terminal state
+	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
+	 * Otherwise it sets the state to FAILED, and, if the invokable code is running,
+	 * starts an asynchronous thread that aborts that code.
+	 *
+	 * <p>This method never blocks.</p>
+	 */
+	void failExternally(Throwable cause);
+
 	// --------------------------------------------------------------------------------------------
 	//  Fields relevant to the I/O system. Should go into Task
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 1f93a0d..80c5fbc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -78,6 +78,8 @@ public class RuntimeEnvironment implements Environment {
 	private final TaskManagerRuntimeInfo taskManagerInfo;
 	private final TaskMetricGroup metrics;
 
+	private final Task containingTask;
+
 	// ------------------------------------------------------------------------
 
 	public RuntimeEnvironment(
@@ -99,6 +101,7 @@ public class RuntimeEnvironment implements Environment {
 			InputGate[] inputGates,
 			ActorGateway jobManager,
 			TaskManagerRuntimeInfo taskManagerInfo,
+			Task containingTask,
 			TaskMetricGroup metrics) {
 
 		this.jobId = checkNotNull(jobId);
@@ -119,6 +122,7 @@ public class RuntimeEnvironment implements Environment {
 		this.inputGates = checkNotNull(inputGates);
 		this.jobManager = checkNotNull(jobManager);
 		this.taskManagerInfo = checkNotNull(taskManagerInfo);
+		this.containingTask = containingTask;
 		this.metrics = metrics;
 	}
 
@@ -262,4 +266,9 @@ public class RuntimeEnvironment implements Environment {
 
 		jobManager.tell(message);
 	}
+
+	@Override
+	public void failExternally(Throwable cause) {
+		this.containingTask.failExternally(cause);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1f766e1..c1cbaa6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -526,7 +526,7 @@ public class Task implements Runnable {
 					userCodeClassLoader, memoryManager, ioManager,
 					broadcastVariableManager, accumulatorRegistry,
 					splitProvider, distributedCacheEntries,
-					writers, inputGates, jobManager, taskManagerConfig, metrics);
+					writers, inputGates, jobManager, taskManagerConfig, this, metrics);
 
 			// let the task code create its readers and writers
 			invokable.setEnvironment(env);
@@ -703,7 +703,7 @@ public class Task implements Runnable {
 				LOG.error(message, t);
 				notifyFatalError(message, t);
 			}
-			
+
 			// un-register the metrics at the end so that the task may already be
 			// counted as finished when this happens
 			// errors here will only be logged

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 78fb422..063e295 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment {
 	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
 
 	@Override
+	public void failExternally(Throwable cause) {
+		throw new UnsupportedOperationException("DummyEnvironment does not support external task
failure.");
+	}
+
+	@Override
 	public ResultPartitionWriter getWriter(int index) {
 		return null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0220149..78e4cce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -289,4 +289,9 @@ public class MockEnvironment implements Environment {
 	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
 		throw new UnsupportedOperationException();
 	}
+
+	@Override
+	public void failExternally(Throwable cause) {
+		throw new UnsupportedOperationException("MockEnvironment does not support external task
failure.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 51904b3..a771c85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -295,7 +295,20 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			}
 		}
 	}
-	
+
+	/**
+	 * Marks task execution failed for an external reason (a reason other than the task code
itself
+	 * throwing an exception). If the task is already in a terminal state
+	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
+	 * Otherwise it sets the state to FAILED, and, if the invokable code is running,
+	 * starts an asynchronous thread that aborts that code.
+	 *
+	 * <p>This method never blocks.</p>
+	 */
+	public void failExternally(Throwable cause) {
+		getEnvironment().failExternally(cause);
+	}
+
 	@Override
 	public final void cancel() throws Exception {
 		isRunning = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index f8c36de..a8dd49b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -302,6 +302,11 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
+	public void failExternally(Throwable cause) {
+		throw new UnsupportedOperationException("StreamMockEnvironment does not support external
task failure.");
+	}
+
+	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
 		return new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
 	}


Mime
View raw message