tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-221. Deadlock in DAGSchedulerMRR when setting slow start to 0.5f on a small cluster (bikas)
Date Thu, 20 Jun 2013 06:19:07 GMT
Updated Branches:
  refs/heads/master a5f2bb64b -> e81396036


TEZ-221. Deadlock in DAGSchedulerMRR when setting slow start to 0.5f on a small cluster (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e8139603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e8139603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e8139603

Branch: refs/heads/master
Commit: e81396036ea5c86c0835c0c77910bab04d4335e5
Parents: a5f2bb6
Author: Bikas Saha <bikas@apache.org>
Authored: Wed Jun 19 23:15:16 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Wed Jun 19 23:15:16 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |   6 ++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   1 -
 .../apache/tez/dag/app/dag/DAGScheduler.java    |   3 +
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |   2 +
 .../app/dag/event/DAGEventSchedulerUpdate.java  |   1 +
 .../DAGEventSchedulerUpdateTAAssigned.java      |  36 +++++++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  17 +++-
 .../tez/dag/app/dag/impl/DAGSchedulerMRR.java   | 101 +++++++++++++++++--
 .../app/dag/impl/DAGSchedulerNaturalOrder.java  |   5 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   5 +
 .../apache/tez/dag/app/rm/TaskScheduler.java    |   3 +
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  48 +--------
 .../tez/dag/app/dag/impl/TestDAGScheduler.java  |   2 +-
 13 files changed, 171 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index ceec42c..f8652f5 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -112,6 +112,12 @@ public class TezConfiguration extends Configuration {
           + "slowstart-vertex-scheduler.max-src-fraction";
   public static final float
           SLOWSTART_VERTEX_SCHEDULER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
+  
+  public static final String 
+          SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION = TEZ_AM_PREFIX
+          + "slowstart-dag-scheduler.min-resource-fraction";
+  public static final float 
+          SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION_DEFAULT = 0.5f;
 
   /**
    * The complete path to the serialized dag plan file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 118410e..d195e2d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -27,7 +27,6 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
index e68b31e..2d3b006 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
@@ -19,12 +19,15 @@
 package org.apache.tez.dag.app.dag;
 
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 
 public interface DAGScheduler {
   
   public void vertexCompleted(Vertex vertex);
   
   public void scheduleTask(DAGEventSchedulerUpdate event);
+  
+  public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event);
 
   public void taskSucceeded(DAGEventSchedulerUpdate event);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 2628f0c..87d41b9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -29,6 +29,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 
 /**
@@ -36,6 +37,7 @@ import org.apache.tez.dag.records.TezVertexID;
  */
 public interface TaskAttempt {
   TezTaskAttemptID getID();
+  TezTaskID getTaskID();
   TezVertexID getVertexID();
   TezDAGID getDAGID();
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
index 9af1376..a436a8c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
@@ -24,6 +24,7 @@ public class DAGEventSchedulerUpdate extends DAGEvent {
   
   public enum UpdateType {
     TA_SCHEDULE,
+    TA_SCHEDULED,
     TA_SUCCEEDED
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
new file mode 100644
index 0000000..8e27843
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+
+public class DAGEventSchedulerUpdateTAAssigned extends DAGEventSchedulerUpdate {
+  
+  final Container container;
+
+  public DAGEventSchedulerUpdateTAAssigned(TaskAttempt attempt, Container container) {
+    super(DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULED, attempt);
+    this.container = container;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 80a24a1..90361c6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -76,6 +76,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
@@ -910,8 +911,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
       if(isMRR) {
         LOG.info("Using MRR dag scheduler");
-        dag.dagScheduler = new DAGSchedulerMRR(dag, dag.eventHandler,
-            dag.appContext.getTaskScheduler());
+        dag.dagScheduler = new DAGSchedulerMRR(
+            dag,
+            dag.eventHandler,
+            dag.appContext.getTaskScheduler(),
+            dag.conf
+                .getFloat(
+        TezConfiguration.SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION,
+        TezConfiguration.SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION_DEFAULT));
       } else {
         LOG.info("Using Natural order dag scheduler");
         dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
@@ -1208,6 +1215,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     switch(sEvent.getUpdateType()) {
       case TA_SCHEDULE:
         dag.dagScheduler.scheduleTask(sEvent);
+        break;
+      case TA_SCHEDULED:
+        DAGEventSchedulerUpdateTAAssigned taEvent = 
+                              (DAGEventSchedulerUpdateTAAssigned) sEvent;
+        dag.dagScheduler.taskScheduled(taEvent);
+        break;
       case TA_SUCCEEDED:
         dag.dagScheduler.taskSucceeded(sEvent);
         break;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
index acf766a..d4a347e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
@@ -18,8 +18,10 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,8 +34,10 @@ import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.records.TezTaskID;
 
 @SuppressWarnings("rawtypes")
 public class DAGSchedulerMRR implements DAGScheduler {
@@ -43,18 +47,27 @@ public class DAGSchedulerMRR implements DAGScheduler {
   private final DAG dag;
   private final TaskSchedulerEventHandler taskScheduler;
   private final EventHandler handler;
+  
+  private final float minReservedShuffleResource;
+  
   private Vertex currentPartitioner = null;
   private Vertex currentShuffler = null;
   private int currentShufflerDepth = 0;
   
   int numShuffleTasksScheduled = 0;
   List<TaskAttempt> pendingShuffleTasks = new LinkedList<TaskAttempt>();
-  
+  Set<TezTaskID> unassignedShuffleTasks = new HashSet<TezTaskID>();
+  Resource realShufflerResource = null;
+
+  Set<TezTaskID> unassignedPartitionTasks = new HashSet<TezTaskID>();
+  Resource realPartitionerResource = null;
+
   public DAGSchedulerMRR(DAG dag, EventHandler dispatcher,
-      TaskSchedulerEventHandler taskScheduler) {
+      TaskSchedulerEventHandler taskScheduler, float minReservedShuffleResource) {
     this.dag = dag;
     this.handler = dispatcher;
     this.taskScheduler = taskScheduler;
+    this.minReservedShuffleResource = minReservedShuffleResource;
   }
   
   @Override
@@ -73,6 +86,11 @@ public class DAGSchedulerMRR implements DAGScheduler {
              currentShuffler.getVertexId() + " is new partitioner":
              "No current shuffler to replace the partitioner"));
       currentPartitioner = currentShuffler;
+      assert unassignedPartitionTasks.isEmpty();
+      unassignedPartitionTasks.addAll(unassignedShuffleTasks);
+      unassignedShuffleTasks.clear();
+      realPartitionerResource = realShufflerResource;
+      realShufflerResource = null;
       currentShuffler = null;
       // schedule all pending shuffle tasks
       schedulePendingShuffles(pendingShuffleTasks.size());
@@ -88,10 +106,17 @@ public class DAGSchedulerMRR implements DAGScheduler {
     Vertex vertex = dag.getVertex(attempt.getVertexID());
     int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
     
+    LOG.info("Schedule task: " + attempt.getID());
+    
     if(currentPartitioner == null) {
       // no partitioner. so set it.
       currentPartitioner = vertex;
       currentShufflerDepth = vertexDistanceFromRoot;
+      assert realPartitionerResource == null;
+      Resource partitionerResource = currentPartitioner.getTaskResource();
+      realPartitionerResource = Resource.newInstance(
+          partitionerResource.getMemory(),
+          partitionerResource.getVirtualCores());
       LOG.info(vertex.getVertexId() + " is new partitioner at depth "
           + vertexDistanceFromRoot);
     } else if (currentShuffler == null && 
@@ -100,16 +125,26 @@ public class DAGSchedulerMRR implements DAGScheduler {
       // shuffler. this must be the new shuffler.
       currentShuffler = vertex;
       currentShufflerDepth = vertexDistanceFromRoot;
+      assert realShufflerResource == null;
+      Resource shufflerResource = currentShuffler.getTaskResource();
+      realShufflerResource = Resource.newInstance(
+          shufflerResource.getMemory(),
+          shufflerResource.getVirtualCores());
       LOG.info(vertex.getVertexId() + " is new shuffler at depth "
           + currentShufflerDepth);
     }
     
     if(currentShuffler == vertex) {
       pendingShuffleTasks.add(attempt);
+      unassignedShuffleTasks.add(attempt.getTaskID());
       schedulePendingShuffles(getNumShufflesToSchedule());
       return;
     }
     
+    if(currentPartitioner == vertex) {
+      unassignedPartitionTasks.add(attempt.getTaskID());
+    }
+    
     // sanity check
     // task should be a partitioner, a shuffler or a retry of an ancestor
     if(currentPartitioner != vertex && currentShuffler != vertex && 
@@ -125,9 +160,35 @@ public class DAGSchedulerMRR implements DAGScheduler {
   }
   
   @Override
+  public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
+    TaskAttempt attempt = event.getAttempt();
+    Vertex vertex = dag.getVertex(attempt.getVertexID());
+    LOG.info("Task assigned: " + attempt.getID() + " Vertex: Total:"
+        + vertex.getTotalTasks() + " succeeded: " + vertex.getSucceededTasks()
+        + " Resource: " + event.getContainer().getResource().getMemory());
+
+    if (currentPartitioner == vertex) {
+      unassignedPartitionTasks.remove(attempt.getTaskID());
+      Resource resource = event.getContainer().getResource();
+      if(resource.getMemory() > realPartitionerResource.getMemory()) {
+        realPartitionerResource.setMemory(resource.getMemory());
+      }
+    } else if (currentShuffler == vertex) {
+      unassignedShuffleTasks.remove(attempt.getTaskID());
+      Resource resource = event.getContainer().getResource();
+      if(resource.getMemory() > realShufflerResource.getMemory()) {
+        realShufflerResource.setMemory(resource.getMemory());
+      }
+    }
+  }
+  
+  @Override
   public void taskSucceeded(DAGEventSchedulerUpdate event) {
     TaskAttempt attempt = event.getAttempt();
     Vertex vertex = dag.getVertex(attempt.getVertexID());
+    LOG.info("Task succeeded: " + attempt.getID() + " Vertex: Total:" + vertex.getTotalTasks()
+ 
+        " succeeded: " + vertex.getSucceededTasks());
+
     if (currentPartitioner == vertex) {
       // resources now available. try to schedule pending shuffles
       schedulePendingShuffles(getNumShufflesToSchedule());
@@ -141,29 +202,39 @@ public class DAGSchedulerMRR implements DAGScheduler {
       return 0;
     }
     
+    if(unassignedPartitionTasks.isEmpty()) {
+      return pendingShuffleTasks.size();
+    }
+    
     assert currentShuffler != null;
     
     // get total resource limit
     Resource totalResources = taskScheduler.getTotalResources();
+    Resource freeResources = taskScheduler.getAvailableResources();
     int totalMem = totalResources.getMemory();
+    int freeMem = freeResources.getMemory();
+    int partitionerTaskMem = realPartitionerResource.getMemory();
+    int shufflerTaskMem = realShufflerResource.getMemory();
+    int shufflerMemAssigned = shufflerTaskMem * numShuffleTasksScheduled;
     
     // get resources needed by partitioner
-    Resource partitionerResource = currentPartitioner.getTaskResource();
-    int partitionerTaskMem = partitionerResource.getMemory();
     int numPartionersLeft = currentPartitioner.getTotalTasks()
         - currentPartitioner.getSucceededTasks();
     int partitionerMemNeeded = numPartionersLeft * partitionerTaskMem;
     
     // find leftover resources for shuffler
     int shufflerMemLeft = totalMem - partitionerMemNeeded;
+
+    int defaultShufflerMem = (int)(minReservedShuffleResource * totalMem);
+    
+    if(shufflerMemLeft < defaultShufflerMem) {
+      shufflerMemLeft = defaultShufflerMem;
+    }
     
-    Resource shufflerResource = currentShuffler.getTaskResource();
-    int shufflerTaskMem = shufflerResource.getMemory();
-    int shufflerMemAssigned = shufflerTaskMem * numShuffleTasksScheduled;
     shufflerMemLeft -= shufflerMemAssigned;
 
     LOG.info("TotalMem: " + totalMem + 
-             " Headroom: " + taskScheduler.getAvailableResources().getMemory() +
+             " Headroom: " + freeMem +
              " PartitionerTaskMem: " + partitionerTaskMem +
              " ShufflerTaskMem: " + shufflerTaskMem + 
              " PartitionerMemNeeded:" + partitionerMemNeeded +
@@ -204,8 +275,18 @@ public class DAGSchedulerMRR implements DAGScheduler {
     int priority = (vertexDistanceFromRoot + 1) * 3;
     
     if(currentShuffler == vertex) {
-      // current shuffler vertex. assign special priority
-      reOrderPriority = true;
+      assert currentPartitioner != null;
+      // assign higher priority only if its needed. If all partitioners are done
+      // then no need to do so.
+      // TODO fix with assigned instead of succeeded
+      if (!unassignedPartitionTasks.isEmpty()) {
+        // current shuffler vertex to be scheduled while current partitioner is
+        // still running. This needs to be higher priority or else it wont get 
+        // allocated. This higher priority will be lower than the priority of a 
+        // partitioner task that is a retry. so retries are safe.
+        // assign special priority
+        reOrderPriority = true;
+      }
     }
     
     if(reOrderPriority) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
index e6f0d29..fc52344 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
@@ -27,6 +27,7 @@ import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 
 @SuppressWarnings("rawtypes")
@@ -73,6 +74,10 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler {
   }
   
   @Override
+  public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
+  }
+
+  @Override
   public void taskSucceeded(DAGEventSchedulerUpdate event) {
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 72e5cea..4473b3e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -292,6 +292,11 @@ public class TaskAttemptImpl implements TaskAttempt,
   public TezTaskAttemptID getID() {
     return attemptId;
   }
+  
+  @Override 
+  public TezTaskID getTaskID() {
+    return attemptId.getTaskID();
+  }
 
   @Override
   public TezVertexID getVertexID() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 75aa434..0567586 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -349,6 +349,9 @@ public class TaskScheduler extends AbstractService
     if(isStopped) {
       return 1;
     }
+    LOG.info("TEMP dagschedulermrr: sync with RM finished. Headroom: "
+        + getAvailableResources().getMemory() + " Allocations: "
+        + taskAllocations.size());
     if(totalResources.getMemory() == 0) {
       // TODO this will not handle dynamic changes
       // assume this is the first allocate callback. nothing is allocated.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 1bd088d..1f2ece5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -22,7 +22,6 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -35,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -48,6 +46,7 @@ import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
 import org.apache.tez.dag.app.rm.container.AMContainerEvent;
@@ -371,49 +370,7 @@ public class TaskSchedulerEventHandler extends AbstractService
   @Override
   public synchronized void taskAllocated(Object task, 
                                            Object appCookie, 
-                                           Container container) {
-    /*
-    availableContainerIds.addAll(event.getContainerIds());
-
-    completedMaps = getJob().getCompletedMaps();
-    completedReduces = getJob().getCompletedReduces();
-    int completedTasks = completedMaps + completedReduces;
-
-    if (lastCompletedTasks != completedTasks) {
-      recalculateReduceSchedule = true;
-      lastCompletedTasks = completedTasks;
-    }
-
-    if (event.didHeadroomChange() || event.getContainerIds().size() > 0) {
-      recalculateReduceSchedule = true;
-    }
-    schedule();
-    .....
-          // Update resource requests
-      requestor.decContainerReq(assigned.getContainerRequest());
-  
-      // TODO Maybe: ApplicationACLs should be populated into the appContext from the RMCommunicator.
-      ContainerId containerId = allocated.getId();
-      if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED)
{
-        AMSchedulerTALaunchRequestEvent tlrEvent = attemptToLaunchRequestMap
-            .get(assigned.getAttemptId());
-        JobConf jobConf = new JobConf(getJob().getConf());
-  
-        AMContainerEventLaunchRequest launchRequest = new AMContainerEventLaunchRequest(
-            containerId, jobId, assigned.getAttemptId().getTaskId()
-                .getTaskType(), tlrEvent.getJobToken(),
-            tlrEvent.getCredentials(), shouldProfileTaskAttempt(
-                jobConf, tlrEvent.getRemoteTaskContext()), jobConf);
-  
-        eventHandler.handle(launchRequest);
-      }
-      eventHandler.handle(new AMContainerEventAssignTA(containerId,
-          assigned.getAttemptId(), attemptToLaunchRequestMap.get(
-              assigned.getAttemptId()).getRemoteTaskContext()));
-  
-      assignedRequests.add(allocated, assigned.getAttemptId());
-    */
-    
+                                           Container container) {    
     ContainerId containerId = container.getId();
     appContext.getAllContainers().addContainerIfNew(container);
     appContext.getAllNodes().nodeSeen(container.getNodeId());   
@@ -440,6 +397,7 @@ public class TaskSchedulerEventHandler extends AbstractService
           taskAttempt.getEnvironment(),
           taskAttempt.getJavaOpts()));
     }
+    sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
     sendEvent(new AMContainerEventAssignTA(containerId,
         taskAttempt.getID(), event.getRemoteTaskContext()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e8139603/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index d971914..8a67977 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -122,7 +122,7 @@ public class TestDAGScheduler {
     DAGEventSchedulerUpdate mockEvent3 = mock(DAGEventSchedulerUpdate.class);
     when(mockEvent3.getAttempt()).thenReturn(mockAttempt3);
     DAGScheduler scheduler = new DAGSchedulerMRR(mockDag, mockEventHandler,
-        mockTaskScheduler);
+        mockTaskScheduler, 0.5f);
     
     // M starts. M completes. R1 starts. R1 completes. R2 starts. R2 completes
     scheduler.scheduleTask(mockEvent1); // M starts


Mime
View raw message