flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/7] flink git commit: [runtime] Improve error messages on Task deployment
Date Tue, 12 May 2015 12:01:30 GMT
[runtime] Improve error messages on Task deployment

This closes #615


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4152d75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4152d75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4152d75

Branch: refs/heads/master
Commit: b4152d75bf5236fbb3853b673af8e17f73cef5e2
Parents: a771cfb
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Apr 21 19:05:34 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/executiongraph/Execution.java   | 11 ++++++++---
 .../apache/flink/runtime/taskmanager/TaskManager.scala   |  2 ++
 2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4152d75/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 93b4f2f..4e046dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import scala.concurrent.Future;
@@ -330,7 +331,7 @@ public class Execution implements Serializable {
 			vertex.getExecutionGraph().registerExecution(this);
 
 			final Instance instance = slot.getInstance();
-			Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
+			final Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
 					new SubmitTask(deployment), new Timeout(timeout));
 
 			deployAction.onComplete(new OnComplete<Object>(){
@@ -339,9 +340,13 @@ public class Execution implements Serializable {
 				public void onComplete(Throwable failure, Object success) throws Throwable {
 					if (failure != null) {
 						if (failure instanceof TimeoutException) {
+							String taskname = Task.getTaskNameWithSubtaskAndID(deployment.getTaskName(),
+									deployment.getIndexInSubtaskGroup(), deployment.getNumberOfSubtasks(),
+									attemptId);
+							
 							markFailed(new Exception(
-									"Cannot deploy task - TaskManager " + instance + " not responding.",
-									failure));
+									"Cannot deploy task " + taskname + " - TaskManager (" + instance
+											+ ") not responding after a timeout of " + timeout, failure));
 						}
 						else {
 							markFailed(failure);

http://git-wip-us.apache.org/repos/asf/flink/blob/b4152d75/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index bdefea6..2e580cc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -771,6 +771,8 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       val execId = tdd.getExecutionId
       val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager,
                           self, jobManagerActor, config.timeout, libCache, fileCache)
+
+      log.info(s"Received task ${task.getTaskNameWithSubtasks}")
       
       // add the task to the map
       val prevTask = runningTasks.put(execId, task)


Mime
View raw message