tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2217. The min-held-containers being released prematurely (bikas) (cherry picked from commit d42a3c7a78b14ba496721e5db4a63229c3cf011c)
Date Thu, 26 Mar 2015 17:09:22 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 6123493b5 -> 0249a4318


TEZ-2217. The min-held-containers being released prematurely (bikas)
(cherry picked from commit d42a3c7a78b14ba496721e5db4a63229c3cf011c)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0249a431
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0249a431
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0249a431

Branch: refs/heads/branch-0.5
Commit: 0249a4318a022b77d26167bdb1783b763be254f3
Parents: 6123493
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Mar 26 10:05:29 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Mar 26 10:08:39 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/app/rm/YarnTaskSchedulerService.java    | 53 +++++++++-----
 .../tez/dag/app/rm/TestTaskScheduler.java       | 77 ++++++++++++++++----
 3 files changed, 96 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0249a431/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 82e0ab5..b2383a0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2217. The min-held-containers being released prematurely
   TEZ-2214. FetcherOrderedGrouped can get stuck indefinitely when MergeManager misses memToDiskMerging
   TEZ-1923. FetcherOrderedGrouped gets into infinite loop due to memory pressure
   TEZ-2219. Should verify the input_name/output_name to be unique per vertex

http://git-wip-us.apache.org/repos/asf/tez/blob/0249a431/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index f8fbd53..bc3c216 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -353,10 +353,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             ", reuseNonLocal: " + reuseNonLocal + 
             ", localitySchedulingDelay: " + localitySchedulingDelay +
             ", preemptionPercentage: " + preemptionPercentage +
-            ", numHeartbeatsBetweenPreemptions" + numHeartbeatsBetweenPreemptions +
-            ", idleContainerMinTimeout=" + idleContainerTimeoutMin +
-            ", idleContainerMaxTimeout=" + idleContainerTimeoutMax +
-            ", sessionMinHeldContainers=" + sessionNumMinHeldContainers);
+            ", numHeartbeatsBetweenPreemptions: " + numHeartbeatsBetweenPreemptions +
+            ", idleContainerMinTimeout: " + idleContainerTimeoutMin +
+            ", idleContainerMaxTimeout: " + idleContainerTimeoutMax +
+            ", sessionMinHeldContainers: " + sessionNumMinHeldContainers);
   }
 
   @Override
@@ -561,6 +561,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   
   @VisibleForTesting
   long getHeldContainerExpireTime(long startTime) {
+    // expire time is at least extended by min time.
+    // corner case when min time = -1 but then it does not matter because
+    // expire time is irrelevant at that point.
     long expireTime = (startTime + idleContainerTimeoutMin);
     if (idleContainerTimeoutMin != -1 && idleContainerTimeoutMin < idleContainerTimeoutMax)
{
       long expireTimeMax = startTime + idleContainerTimeoutMax;
@@ -614,22 +617,23 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           && idleContainerTimeoutMin != -1)) {
         // container idle timeout has expired or is a new unused container. 
         // new container is possibly a spurious race condition allocation.
-        if (!isNew && appContext.isSession() && 
-            sessionMinHeldContainers.contains(heldContainer.getContainer().getId())) {
-          // Not a potentially spurious new container.
+        if (appContext.isSession()
+            && sessionMinHeldContainers.contains(heldContainer.getContainer().getId()))
{
+          // There are no outstanding requests. So its safe to hold new containers.
+          // We may have received more containers than necessary and some are unused
           // In session mode and container in set of chosen min held containers
           // increase the idle container expire time to maintain sanity with 
-          // the rest of the code
+          // the rest of the code.
           heldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime));
         } else {
-          releaseContainer = true;
+          releaseContainer = true;          
         }
       }
       
       if (releaseContainer) {
         LOG.info("No taskRequests. Container's idle timeout delay expired or is new. " +
             "Releasing container"
-            + ", containerId=" + heldContainer.container.getId()
+            + ", containerId=" + heldContainer.getContainer().getId()
             + ", containerExpiryTime="
             + heldContainer.getContainerExpiryTime()
             + ", idleTimeout=" + idleContainerTimeoutMin
@@ -638,7 +642,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
             + ", isNew=" + isNew);
           releaseUnassignedContainers(
-              Lists.newArrayList(heldContainer.container));        
+              Lists.newArrayList(heldContainer.getContainer()));        
       } else {
         // no outstanding work and container idle timeout not expired
         if (LOG.isDebugEnabled()) {
@@ -656,7 +660,20 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       }
    } else if (state.equals(DAGAppMasterState.RUNNING)) {
       // clear min held containers since we need to allocate to tasks
-      sessionMinHeldContainers.clear();
+      if (!sessionMinHeldContainers.isEmpty()) {
+        // update the expire time of min held containers so that they are
+        // not released immediately, when new requests come in, if they come in 
+        // just before these containers are about to expire (race condition)
+        long currentTime = System.currentTimeMillis();
+        for (ContainerId minHeldCId : sessionMinHeldContainers) {
+          HeldContainer minHeldContainer = heldContainers.get(minHeldCId);
+          if (minHeldContainer != null) {
+            // check in case it got removed because of external reasons
+            minHeldContainer.setContainerExpiryTime(getHeldContainerExpireTime(currentTime));
+          }
+        }
+        sessionMinHeldContainers.clear();
+      }
       HeldContainer.LocalityMatchLevel localityMatchLevel =
         heldContainer.getLocalityMatchLevel();
       Map<CookieContainerRequest, Container> assignedContainers =
@@ -1441,11 +1458,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   }
   
   private void pushNewContainerToDelayed(List<Container> containers){
-    long expireTime = -1;
-    if (idleContainerTimeoutMin > 0) {
-      long currentTime = System.currentTimeMillis();
-      expireTime = currentTime + idleContainerTimeoutMin;
-    }
+    long expireTime = getHeldContainerExpireTime(System.currentTimeMillis());
 
     synchronized (delayedContainerManager) {
       for (Container container : containers) {
@@ -1953,7 +1966,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       releaseUnassignedContainers(new ContainerIterable(pendingContainers));
     }
 
-    private void addDelayedContainer(Container container,
+    @VisibleForTesting
+    void addDelayedContainer(Container container,
         long nextScheduleTime) {
       HeldContainer delayedContainer = heldContainers.get(container.getId());
       if (delayedContainer == null) {
@@ -2055,7 +2069,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       }
     }
     
-    LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers");
+    LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers"
+        + " out of total held containers: " + heldContainers.size());
   }
 
   private class ContainerIterable implements Iterable<Container> {

http://git-wip-us.apache.org/repos/asf/tez/blob/0249a431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 5782d01..ad4ad1e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -991,12 +991,13 @@ public class TestTaskScheduler {
   }
   
   @SuppressWarnings("unchecked")
-  @Test(timeout=5000)
+  @Test (timeout=5000)
   public void testTaskSchedulerDetermineMinHeldContainers() throws Exception {
     RackResolver.init(new YarnConfiguration());
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
     AppContext mockAppContext = mock(AppContext.class);
     when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
+    when(mockAppContext.isSession()).thenReturn(true);
 
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(TezAMRMClientAsync.class);
@@ -1008,6 +1009,8 @@ public class TestTaskScheduler {
       new TaskSchedulerWithDrainableAppCallback(
         mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
         appUrl, mockRMClient, mockAppContext);
+    TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
+        .getDrainableAppCallback();
 
     Configuration conf = new Configuration();
     scheduler.init(conf);
@@ -1035,42 +1038,71 @@ public class TestTaskScheduler {
     String node1Rack3 = "n1r3";
     ApplicationAttemptId appId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0,
0), 0);
 
+    Resource r = Resource.newInstance(0, 0);
     ContainerId mockCId1 = ContainerId.newInstance(appId, 0);
-    HeldContainer hc1 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    Container c1 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(c1.getNodeId().getHost()).thenReturn(""); // we are mocking directly
+    HeldContainer hc1 = Mockito.spy(new HeldContainer(c1, 0, 0, null));
     when(hc1.getNode()).thenReturn(node1Rack1);
     when(hc1.getRack()).thenReturn(rack1);
-    when(hc1.getContainer().getId()).thenReturn(mockCId1);
+    when(c1.getId()).thenReturn(mockCId1);
+    when(c1.getResource()).thenReturn(r);
+    when(hc1.getContainer()).thenReturn(c1);
     ContainerId mockCId2 = ContainerId.newInstance(appId, 1);
-    HeldContainer hc2 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    Container c2 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(c2.getNodeId().getHost()).thenReturn(""); // we are mocking directly
+    HeldContainer hc2 = Mockito.spy(new HeldContainer(c2, 0, 0, null));
     when(hc2.getNode()).thenReturn(node2Rack1);
     when(hc2.getRack()).thenReturn(rack1);
-    when(hc2.getContainer().getId()).thenReturn(mockCId2);
+    when(c2.getId()).thenReturn(mockCId2);
+    when(c2.getResource()).thenReturn(r);
+    when(hc2.getContainer()).thenReturn(c2);
     ContainerId mockCId3 = ContainerId.newInstance(appId, 2);
-    HeldContainer hc3 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    Container c3 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(c3.getNodeId().getHost()).thenReturn(""); // we are mocking directly
+    HeldContainer hc3 = Mockito.spy(new HeldContainer(c3, 0, 0, null));
     when(hc3.getNode()).thenReturn(node1Rack1);
     when(hc3.getRack()).thenReturn(rack1);
-    when(hc3.getContainer().getId()).thenReturn(mockCId3);
+    when(c3.getId()).thenReturn(mockCId3);
+    when(c3.getResource()).thenReturn(r);
+    when(hc3.getContainer()).thenReturn(c3);
     ContainerId mockCId4 = ContainerId.newInstance(appId, 3);
-    HeldContainer hc4 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    Container c4 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(c4.getNodeId().getHost()).thenReturn(""); // we are mocking directly
+    HeldContainer hc4 = Mockito.spy(new HeldContainer(c4, 0, 0, null));
     when(hc4.getNode()).thenReturn(node2Rack1);
     when(hc4.getRack()).thenReturn(rack1);
-    when(hc4.getContainer().getId()).thenReturn(mockCId4);
+    when(c4.getId()).thenReturn(mockCId4);
+    when(c4.getResource()).thenReturn(r);
+    when(hc4.getContainer()).thenReturn(c4);
     ContainerId mockCId5 = ContainerId.newInstance(appId, 4);
-    HeldContainer hc5 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    Container c5 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(c5.getNodeId().getHost()).thenReturn(""); // we are mocking directly
+    HeldContainer hc5 = Mockito.spy(new HeldContainer(c5, 0, 0, null));
     when(hc5.getNode()).thenReturn(node1Rack2);
     when(hc5.getRack()).thenReturn(rack2);
-    when(hc5.getContainer().getId()).thenReturn(mockCId5);
+    when(c5.getId()).thenReturn(mockCId5);
+    when(c5.getResource()).thenReturn(r);
+    when(hc5.getContainer()).thenReturn(c5);
     ContainerId mockCId6 = ContainerId.newInstance(appId, 5);
-    HeldContainer hc6 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    Container c6 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(c6.getNodeId().getHost()).thenReturn(""); // we are mocking directly
+    HeldContainer hc6 = Mockito.spy(new HeldContainer(c6, 0, 0, null));
     when(hc6.getNode()).thenReturn(node2Rack2);
     when(hc6.getRack()).thenReturn(rack2);
-    when(hc6.getContainer().getId()).thenReturn(mockCId6);
+    when(c6.getId()).thenReturn(mockCId6);
+    when(c6.getResource()).thenReturn(r);
+    when(hc6.getContainer()).thenReturn(c6);
     ContainerId mockCId7 = ContainerId.newInstance(appId, 6);
-    HeldContainer hc7 = mock(HeldContainer.class, RETURNS_DEEP_STUBS);
+    Container c7 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(c7.getNodeId().getHost()).thenReturn(""); // we are mocking directly
+    HeldContainer hc7 = Mockito.spy(new HeldContainer(c7, 0, 0, null));
     when(hc7.getNode()).thenReturn(node1Rack3);
     when(hc7.getRack()).thenReturn(rack3);
-    when(hc7.getContainer().getId()).thenReturn(mockCId7);
-    
+    when(c7.getId()).thenReturn(mockCId7);
+    when(c7.getResource()).thenReturn(r);
+    when(hc7.getContainer()).thenReturn(c7);
+
     scheduler.heldContainers.put(mockCId1, hc1);
     scheduler.heldContainers.put(mockCId2, hc2);
     scheduler.heldContainers.put(mockCId3, hc3);
@@ -1116,6 +1148,19 @@ public class TestTaskScheduler {
     Assert.assertTrue(racks.contains(rack1) && racks.contains(rack2) &&
         racks.contains(rack3));
     
+    long currTime = System.currentTimeMillis();
+    heldContainers.clear();
+    heldContainers.addAll(scheduler.heldContainers.values());
+    for (HeldContainer hc : heldContainers) {
+      when(hc.isNew()).thenReturn(true);
+      scheduler.delayedContainerManager.addDelayedContainer(hc.getContainer(), currTime);
+    }
+    Thread.sleep(1000);
+    drainableAppCallback.drain();
+    // only the 2 container not in min-held containers are released
+    verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
+    Assert.assertEquals(5, scheduler.heldContainers.size());
+
     String appMsg = "success";
     AppFinalStatus finalStatus =
         new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);


Mime
View raw message