tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject git commit: TAJO-588: In some case, leaf task of DefaultTaskScheduler are not distributed execution. (jinho)
Date Thu, 06 Feb 2014 09:47:25 GMT
Updated Branches:
  refs/heads/master d0206493b -> 009e0253c


TAJO-588: In some case, leaf task of DefaultTaskScheduler are not distributed execution. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/009e0253
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/009e0253
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/009e0253

Branch: refs/heads/master
Commit: 009e0253ca04dd4b353a3f383571123e1e75719c
Parents: d020649
Author: jinossy <jinossy@gmail.com>
Authored: Thu Feb 6 18:46:30 2014 +0900
Committer: jinossy <jinossy@gmail.com>
Committed: Thu Feb 6 18:46:30 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../java/org/apache/tajo/conf/TajoConf.java     |  2 +-
 .../tajo/master/DefaultTaskScheduler.java       | 33 ++++++++++++++------
 .../tajo/master/querymaster/SubQuery.java       | 31 +++++++++++-------
 4 files changed, 48 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/009e0253/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7f58b4a..36c1062 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -246,6 +246,9 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-588: In some case, leaf task of DefaultTaskScheduler are not 
+    distributed execution. (jinho)
+
     TAJO-586: containFunction shouldn't throw NoSuchFunctionException. (jinho)
 
     TAJO-582: Invalid split calculation. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/009e0253/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index d465ca3..7c82d72 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -202,7 +202,7 @@ public class TajoConf extends Configuration {
     // Task Configuration
     TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
     TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f),
-    TASK_DEFAULT_SIZE("tajo.task.size-mb", 64),
+    TASK_DEFAULT_SIZE("tajo.task.size-mb", 128),
     //////////////////////////////////
 
     // Metrics

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/009e0253/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index d8b7d75..b6bde94 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -92,13 +92,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       public void run() {
 
         while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          schedule();
           try {
-            Thread.sleep(100);
+            synchronized (schedulingThread){
+              schedulingThread.wait(100);
+            }
           } catch (InterruptedException e) {
             break;
           }
-
-          schedule();
         }
         LOG.info("TaskScheduler schedulingThread stopped");
       }
@@ -127,8 +128,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   @Override
   public void stop() {
     stopEventHandling = true;
+
     if (schedulingThread != null) {
-      schedulingThread.interrupt();
+      synchronized (schedulingThread) {
+        schedulingThread.notifyAll();
+      }
     }
 
     // Return all of request callbacks instantly.
@@ -221,6 +225,15 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   @Override
   public void handleTaskRequestEvent(TaskRequestEvent event) {
     taskRequests.handle(event);
+    int hosts = scheduledRequests.leafTaskHostMapping.size();
+
+    // if available cluster resource are large then tasks, the scheduler thread are working
immediately.
+    if(remainingScheduledObjectNum() > 0 &&
+        (remainingScheduledObjectNum() <= hosts || hosts / 2 < taskRequests.size())){
+      synchronized (schedulingThread){
+        schedulingThread.notifyAll();
+      }
+    }
   }
 
   @Override
@@ -433,6 +446,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             attemptId = tId;
             //LOG.info(attemptId + " Assigned based on host match " + hostName);
             hostLocalAssigned++;
+            totalAssigned++;
             break;
           }
         }
@@ -448,8 +462,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             if (leafTasks.contains(tId)) {
               leafTasks.remove(tId);
               attemptId = tId;
-              //LOG.info(attemptId + "Assigned based on rack match " + rack);
+
               rackLocalAssigned++;
+              totalAssigned++;
+
+              LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%,
Rack host: %s",
+                  hostLocalAssigned, rackLocalAssigned, totalAssigned,
+                  ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
               break;
             }
           }
@@ -483,16 +502,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               host, container.getTaskPort()));
           assignedRequest.add(attemptId);
 
-          totalAssigned++;
           scheduledObjectNum -= task.getAllFragments().size();
           taskRequest.getCallback().run(taskAssign.getProto());
         } else {
           throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
         }
       }
-
-      LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
-      LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
     }
 
     private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/009e0253/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index a229169..02c2f6a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -549,7 +549,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       TajoConf conf = subQuery.context.getConf();
       subQuery.schedulerContext = new TaskSchedulerContext(subQuery.context,
           subQuery.getMasterPlan().isLeaf(subQuery.getId()), subQuery.getId());
-      subQuery.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) *
1024 * 1024);
       subQuery.taskScheduler = TaskSchedulerFactory.get(conf, subQuery.schedulerContext,
subQuery);
       subQuery.taskScheduler.init(conf);
       LOG.info(subQuery.taskScheduler.getName() + " is chosen for the task scheduling");
@@ -786,9 +785,18 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       }
 
       SubQuery.scheduleFragments(subQuery, fragments);
-      int estimatedTaskNum = (int) Math.ceil((double)table.getStats().getNumBytes() /
-          (double)subQuery.schedulerContext.getTaskSize());
-      subQuery.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
+      if (subQuery.getTaskScheduler() instanceof DefaultTaskScheduler) {
+        //Leaf task of DefaultTaskScheduler should be fragment size
+        // EstimatedTaskNum determined number of initial container
+        subQuery.schedulerContext.setTaskSize(fragments.size());
+        subQuery.schedulerContext.setEstimatedTaskNum(fragments.size());
+      } else {
+        TajoConf conf = subQuery.context.getConf();
+        subQuery.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE)
* 1024 * 1024);
+        int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
+            (double) subQuery.schedulerContext.getTaskSize());
+        subQuery.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
+      }
     }
   }
 
@@ -888,17 +896,18 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
                            SubQueryEvent event) {
       SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
       QueryUnit task = subQuery.getQueryUnit(taskEvent.getTaskId());
-      QueryUnitAttempt taskAttempt = task.getSuccessfulAttempt();
-      if (task.isLeafTask()) {
-        subQuery.completedObjectCount += task.getTotalFragmentNum();
-      } else {
-        subQuery.completedObjectCount ++;
-      }
-      subQuery.completedTaskCount++;
 
       if (task == null) { // task failed
         subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_FAILED));
       } else {
+        QueryUnitAttempt taskAttempt = task.getSuccessfulAttempt();
+        if (task.isLeafTask()) {
+          subQuery.completedObjectCount += task.getTotalFragmentNum();
+        } else {
+          subQuery.completedObjectCount++;
+        }
+        subQuery.completedTaskCount++;
+
         LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount
+ "/"
             + subQuery.schedulerContext.getEstimatedTaskNum() + " on " + taskAttempt.getHost()
+ ":" + taskAttempt.getPort());
         if (subQuery.taskScheduler.remainingScheduledObjectNum() == 0


Mime
View raw message