tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1707: Rack local count can be more than actual number of tasks.
Date Wed, 02 Sep 2015 08:04:59 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 2c9305add -> f2c434332


TAJO-1707: Rack local count can be more than actual number of tasks.

Closes #717


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f2c43433
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f2c43433
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f2c43433

Branch: refs/heads/master
Commit: f2c434332822fbf53d9b22810509baf5c9718e56
Parents: 2c9305a
Author: Jinho Kim <jhkim@apache.org>
Authored: Wed Sep 2 17:04:13 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Wed Sep 2 17:04:13 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../tajo/querymaster/AbstractTaskScheduler.java |  6 ++++
 .../tajo/querymaster/DefaultTaskScheduler.java  | 38 ++++++++++----------
 3 files changed, 29 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/f2c43433/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b2ace54..23291e5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -234,6 +234,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1707: Rack local count can be more than actual number of tasks.
+    (jinho)
+
     TAJO-1808: Wrong table type problem in catalog. (jihoon)
 
     TAJO-1801: Table name is not unique of tableDescMap in QueryMasterTask. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f2c43433/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
index 8636eaa..1651bb3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
@@ -32,6 +32,8 @@ public abstract class AbstractTaskScheduler extends AbstractService implements
E
   protected int rackLocalAssigned;
   protected int totalAssigned;
   protected int cancellation;
+  protected int totalAttempts;
+
   protected Set<String> leafTaskHosts = Sets.newHashSet();
 
   /**
@@ -59,6 +61,10 @@ public abstract class AbstractTaskScheduler extends AbstractService implements
E
     return cancellation;
   }
 
+  public int getTotalAttempts() {
+    return totalAttempts;
+  }
+
   public abstract void releaseTaskAttempt(TaskAttempt taskAttempt);
   public abstract int remainingScheduledObjectNum();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f2c43433/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index f9a5767..d380295 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -98,6 +98,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     scheduledRequests = new ScheduledRequests();
     minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY);
     schedulerDelay= tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY);
+    isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock());
 
     this.schedulingThread = new Thread() {
       public void run() {
@@ -129,7 +130,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   public void start() {
     LOG.info("Start TaskScheduler");
     maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size()
* 2);
-    isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock());
 
     if (isLeaf) {
       candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts()));
@@ -686,9 +686,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           //find remaining local task
           if (leafTasks.contains(attemptId)) {
             leafTasks.remove(attemptId);
-            //LOG.info(attemptId + " Assigned based on host match " + hostName);
-            hostLocalAssigned++;
-            totalAssigned++;
             return attemptId;
           }
         }
@@ -755,15 +752,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         }
       }
 
-      if (attemptId != null) {
-        rackLocalAssigned++;
-        totalAssigned++;
-
-        LOG.info(String.format("Assigned Local/Rack/Cancel/Total: (%d/%d/%d/%d), Locality:
%.2f%%, Rack host: %s",
-            hostLocalAssigned, rackLocalAssigned, cancellation, totalAssigned,
-            ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
-
-      }
       return attemptId;
     }
 
@@ -775,6 +763,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
       TaskRequestEvent taskRequest;
       while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty()))
{
+        int localAssign = 0;
+        int rackAssign = 0;
+
         taskRequest = taskRequests.pollFirst();
         if(taskRequest == null) { // if there are only remote task requests
           taskRequest = remoteTaskRequests.pollFirst();
@@ -855,13 +846,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             synchronized (leafTasks){
               attemptId = leafTasks.iterator().next();
               leafTasks.remove(attemptId);
-              rackLocalAssigned++;
-              totalAssigned++;
-              LOG.info(String.format("Assigned Local/Remote/Cancel/Total: (%d/%d/%d/%d),
Locality: %.2f%%,",
-                  hostLocalAssigned, rackLocalAssigned, cancellation, totalAssigned,
-                  ((double) hostLocalAssigned / (double) totalAssigned) * 100));
             }
           }
+          rackAssign++;
+        } else {
+          localAssign++;
         }
 
         if (attemptId != null) {
@@ -894,6 +883,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
           AsyncRpcClient tajoWorkerRpc = null;
           CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+          totalAttempts++;
           try {
             tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class,
true);
             TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
@@ -917,6 +907,18 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             LOG.error(e);
           }
           scheduledObjectNum--;
+          totalAssigned++;
+          hostLocalAssigned += localAssign;
+          rackLocalAssigned += rackAssign;
+
+          if (rackAssign > 0) {
+            LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), " +
+                    "Attempted Cancel/Assign/Total: (%d/%d/%d), " +
+                    "Locality: %.2f%%, Rack host: %s",
+                hostLocalAssigned, rackLocalAssigned, totalAssigned,
+                cancellation, totalAssigned, totalAttempts,
+                ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
+          }
 
         } else {
           throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");


Mime
View raw message