tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-2081: Incorrect task locality on single node.
Date Fri, 04 Mar 2016 06:09:59 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 0d94efe0f -> fdb76ed2c


TAJO-2081: Incorrect task locality on single node.

Closes #968


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fdb76ed2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fdb76ed2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fdb76ed2

Branch: refs/heads/master
Commit: fdb76ed2c7c06e7a9b43bbea18e202586b88606a
Parents: 0d94efe
Author: Jinho Kim <jhkim@apache.org>
Authored: Fri Mar 4 15:09:10 2016 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Fri Mar 4 15:09:10 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |  2 +
 .../tajo/querymaster/DefaultTaskScheduler.java  | 64 +++++++++-----------
 3 files changed, 34 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/fdb76ed2/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 50e1de0..7892aa2 100644
--- a/CHANGES
+++ b/CHANGES
@@ -106,6 +106,8 @@ Release 0.12.0 - unreleased
 
   BUG FIXES
 
+    TAJO-2081: Incorrect task locality on single node. (jinho)
+
     TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an 
     union block. (jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fdb76ed2/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index a535ece..c36f43b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -203,6 +203,8 @@ public class TajoConf extends Configuration {
 
     QUERYMASTER_TASK_SCHEDULER_DELAY("tajo.qm.task-scheduler.delay", 50),  // 50 ms
 
+    QUERYMASTER_TASK_SCHEDULER_REQUEST_MAX_NUM("tajo.qm.task-scheduler.request.max-num",
50),
+
     // Catalog
     CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "localhost:26005", Validators.networkAddr()),
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fdb76ed2/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index e290184..a0e76cc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -64,8 +64,6 @@ import static org.apache.tajo.ResourceProtos.*;
 public class DefaultTaskScheduler extends AbstractTaskScheduler {
   private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
 
-  private static final String REQUEST_MAX_NUM = "tajo.qm.task-scheduler.request.max-num";
-
   private final TaskSchedulerContext context;
   private Stage stage;
   private TajoConf tajoConf;
@@ -123,7 +121,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             break;
           }
         }
-        LOG.info("TaskScheduler schedulingThread stopped");
+        info(LOG, "TaskScheduler schedulingThread stopped");
       }
     };
     super.init(conf);
@@ -131,8 +129,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
   @Override
   public void start() {
-    LOG.info("Start TaskScheduler");
-    maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size());
+    info(LOG, "Start TaskScheduler");
+    maximumRequestContainer = Math.min(tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_REQUEST_MAX_NUM)
+        , stage.getContext().getWorkerMap().size());
 
     if (isLeaf) {
       candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts()));
@@ -160,10 +159,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     }
     candidateWorkers.clear();
     scheduledRequests.clear();
-    LOG.info("Task Scheduler stopped");
+    info(LOG, "Task Scheduler stopped");
     super.stop();
   }
 
+  protected void info(Log log, String message) {
+    log.info(String.format("[%s] %s", stage.getId(), message));
+  }
+
   private Fragment[] fragmentsForNonLeafTask;
   private Fragment[] broadcastFragmentsForNonLeafTask;
 
@@ -417,7 +420,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       TaskAttemptId taskAttemptId = null;
 
       if (unassignedTaskForEachVolume.size() >  0) {
-        int retry = unassignedTaskForEachVolume.size();
+        int retry = diskVolumeLoads.size();
         do {
           //clean and get a remaining local task
           taskAttemptId = getAndRemove(volumeId);
@@ -473,6 +476,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           Iterator<TaskAttempt> iterator = list.iterator();
           taskAttempt = iterator.next();
           iterator.remove();
+          remainTasksNum.decrementAndGet();
         }
 
         taskAttemptId = taskAttempt.getId();
@@ -484,6 +488,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         }
 
         increaseConcurrency(volumeId);
+      } else {
+        unassignedTaskForEachVolume.remove(volumeId);
       }
 
       return taskAttemptId;
@@ -519,14 +525,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       }
 
       if (volumeId > -1) {
-        LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency :
" + concurrency);
+        info(LOG, "Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency :
" + concurrency);
       } else if (volumeId == -1) {
         // this case is disabled namenode block meta or compressed text file or amazon s3
-        LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency
: " + concurrency);
+        info(LOG, "Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency
: " + concurrency);
       } else if (volumeId == REMOTE) {
         // this case has processed all block on host and it will be assigned to remote
-        LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
-            + ", Remote Concurrency : " + concurrency);
+        info(LOG, "Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
+            + ", Remote Concurrency : " + concurrency + ", Unassigned volumes: " + unassignedTaskForEachVolume.size());
       }
       diskVolumeLoads.put(volumeId, concurrency);
       return concurrency;
@@ -537,13 +543,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
      */
     private synchronized void decreaseConcurrency(int volumeId){
       if(diskVolumeLoads.containsKey(volumeId)){
-        Integer concurrency = diskVolumeLoads.get(volumeId);
+        int concurrency = diskVolumeLoads.get(volumeId);
         if(concurrency > 0){
           diskVolumeLoads.put(volumeId, concurrency - 1);
-        } else {
-          if (volumeId > REMOTE && !unassignedTaskForEachVolume.containsKey(volumeId))
{
-            diskVolumeLoads.remove(volumeId);
-          }
         }
       }
     }
@@ -559,7 +561,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
         if(volumeEntry == null) volumeEntry = entry;
 
-        if (volumeEntry.getValue() >= entry.getValue()) {
+        if (entry.getKey() != REMOTE && volumeEntry.getValue() >= entry.getValue())
{
           volumeEntry = entry;
         }
       }
@@ -596,19 +598,16 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
   public void cancel(TaskAttempt taskAttempt) {
 
+    TaskAttemptToSchedulerEvent schedulerEvent = new TaskAttemptToSchedulerEvent(
+        EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(),
+        null, taskAttempt);
+
     if(taskAttempt.isLeafTask()) {
       releaseTaskAttempt(taskAttempt);
 
-      List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
-
-      for (DataLocation location : locations) {
-        HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
-        volumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt);
-      }
-
-      scheduledRequests.leafTasks.add(taskAttempt.getId());
+      scheduledRequests.addLeafTask(schedulerEvent);
     } else {
-      scheduledRequests.nonLeafTasks.add(taskAttempt.getId());
+      scheduledRequests.addNonLeafTask(schedulerEvent);
     }
 
     context.getMasterContext().getEventHandler().handle(
@@ -826,7 +825,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
                 tailLimit = Math.max(remainingScheduledObjectNum() / nodes, 1);
               }
 
-              if (hostVolumeMapping.getRemoteConcurrency() >= tailLimit) { //remote task
throttling per node
+              //remote task throttling per node
+              if (nodes > 1 && hostVolumeMapping.getRemoteConcurrency() >=
tailLimit) {
                 continue;
               } else {
                 // assign to remote volume
@@ -904,9 +904,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
                 cancellation++;
               }
 
-              if(LOG.isDebugEnabled()) {
-                LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount()
+ " from " +  addr);
-              }
+              info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount()
+ " from " +  addr);
               continue;
             }
           } catch (Exception e) {
@@ -918,7 +916,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           rackLocalAssigned += rackAssign;
 
           if (rackAssign > 0) {
-            LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), " +
+            info(LOG, String.format("Assigned Local/Rack/Total: (%d/%d/%d), " +
                     "Attempted Cancel/Assign/Total: (%d/%d/%d), " +
                     "Locality: %.2f%%, Rack host: %s",
                 hostLocalAssigned, rackLocalAssigned, totalAssigned,
@@ -1022,9 +1020,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
                 cancellation++;
               }
 
-              if(LOG.isDebugEnabled()) {
-                LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount()
+ " from " +  addr);
-              }
+              info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount()
+ " from " +  addr);
               continue;
             }
 


Mime
View raw message