tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-2100: Add missing cancellation in defaultTaskScheduler when a worker is no respond.
Date Fri, 18 Mar 2016 02:29:46 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.2 191faa0fd -> 1ec59c12a


TAJO-2100: Add missing cancellation in defaultTaskScheduler when a worker is no respond.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1ec59c12
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1ec59c12
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1ec59c12

Branch: refs/heads/branch-0.11.2
Commit: 1ec59c12af5020cbcb65d13dde4de37a96d10eb1
Parents: 191faa0
Author: Jinho Kim <jhkim@apache.org>
Authored: Fri Mar 18 11:29:10 2016 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Fri Mar 18 11:29:10 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../tajo/querymaster/DefaultTaskScheduler.java  | 54 ++++++++++++++------
 .../apache/tajo/querymaster/TaskAttempt.java    |  1 +
 3 files changed, 43 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1ec59c12/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index d6ae247..71ba9d5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.2 - unreleased
 
   BUG FIXES
 
+    TAJO-2100: Add missing cancellation in defaultTaskScheduler when a worker is 
+    no respond. (jinho)
+
     TAJO-2092: TestStorages.testNullHandlingTypesWithProjection occasionally fail.
     (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1ec59c12/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index c6eaaae..256fa50 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -21,6 +21,7 @@ package org.apache.tajo.querymaster;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.TaskRequest;
 import org.apache.tajo.engine.query.TaskRequestImpl;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -53,9 +55,11 @@ import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.RpcParameterFactory;
 import org.apache.tajo.util.TUtil;
 
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -168,6 +172,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     log.info(String.format("[%s] %s", stage.getId(), message));
   }
 
+  protected void warn(Log log, String message) {
+    log.warn(String.format("[%s] %s", stage.getId(), message));
+  }
+
   private Fragment[] fragmentsForNonLeafTask;
   private Fragment[] broadcastFragmentsForNonLeafTask;
 
@@ -596,7 +604,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     }
   }
 
-  public void cancel(TaskAttempt taskAttempt) {
+  protected void cancel(TaskAttempt taskAttempt) {
 
     TaskAttemptToSchedulerEvent schedulerEvent = new TaskAttemptToSchedulerEvent(
         EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(),
@@ -614,6 +622,16 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         new TaskAttemptEvent(taskAttempt.getId(), TaskAttemptEventType.TA_ASSIGN_CANCEL));
   }
 
+  protected int cancel(List<TaskAllocationProto> tasks) {
+    int canceled = 0;
+    for (TaskAllocationProto proto : tasks) {
+      TaskAttemptId attemptId = new TaskAttemptId(proto.getTaskRequest().getId());
+      cancel(stage.getTask(attemptId.getTaskId()).getAttempt(attemptId));
+      canceled++;
+    }
+    return canceled;
+  }
+
   private class ScheduledRequests {
     // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some
task is included in
     // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task
runner
@@ -899,17 +917,20 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             BatchAllocationResponse responseProto = callFuture.get();
 
             if (responseProto.getCancellationTaskCount() > 0) {
-              for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) {
-                cancel(task.getAttempt(new TaskAttemptId(proto.getTaskRequest().getId())));
-                cancellation++;
-              }
-
+              cancellation += cancel(responseProto.getCancellationTaskList());
               info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount()
+ " from " +  addr);
               continue;
             }
+          } catch (ExecutionException | ConnectException e) {
+            cancellation += cancel(requestProto.getTaskRequestList());
+
+            warn(LOG, "Canceled requests: " + requestProto.getTaskRequestCount()
+                + " by " + ExceptionUtils.getFullStackTrace(e));
+            continue;
           } catch (Exception e) {
-            LOG.error(e);
+            throw new TajoInternalError(e);
           }
+
           scheduledObjectNum--;
           totalAssigned++;
           hostLocalAssigned += localAssign;
@@ -1009,26 +1030,29 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           try {
             tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class,
true,
                 rpcParams);
+
             TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
             tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(),
callFuture);
 
             BatchAllocationResponse responseProto = callFuture.get();
 
             if(responseProto.getCancellationTaskCount() > 0) {
-              for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) {
-                cancel(task.getAttempt(new TaskAttemptId(proto.getTaskRequest().getId())));
-                cancellation++;
-              }
-
+              cancellation += cancel(responseProto.getCancellationTaskList());
               info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount()
+ " from " +  addr);
               continue;
             }
 
-            totalAssigned++;
-            scheduledObjectNum--;
+          } catch (ExecutionException | ConnectException e) {
+            cancellation += cancel(requestProto.getTaskRequestList());
+            warn(LOG, "Canceled requests: " + requestProto.getTaskRequestCount()
+                + " by " + ExceptionUtils.getFullStackTrace(e));
+            continue;
           } catch (Exception e) {
-            LOG.error(e);
+            throw new TajoInternalError(e);
           }
+
+          totalAssigned++;
+          scheduledObjectNum--;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1ec59c12/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
index 5eef883..04432c6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
@@ -322,6 +322,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent>
{
       }
       TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
       taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
+      taskAttempt.getTask().setLaunchTime(System.currentTimeMillis());
       taskAttempt.eventHandler.handle(
           new TaskTAttemptEvent(taskAttempt.getId(),
               TaskEventType.T_ATTEMPT_LAUNCHED));


Mime
View raw message