tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2816. Preemption sometimes does not respect heartbeats between preemptions (bikas) (cherry picked from commit a06cd76d8ddcee5f7f939cf72e3eeb3cc59033d0)
Date Fri, 18 Sep 2015 22:13:47 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 b939cc38b -> 0ce51cc6a


TEZ-2816. Preemption sometimes does not respect heartbeats between preemptions (bikas)
(cherry picked from commit a06cd76d8ddcee5f7f939cf72e3eeb3cc59033d0)

Conflicts:
	CHANGES.txt
	tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
	tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
	tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0ce51cc6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0ce51cc6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0ce51cc6

Branch: refs/heads/branch-0.7
Commit: 0ce51cc6a8571be19ea675187c3ca44d81930865
Parents: b939cc3
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Sep 18 14:55:27 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Sep 18 15:11:57 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  6 +++
 .../dag/app/rm/YarnTaskSchedulerService.java    | 42 +++++++++++---------
 .../tez/dag/app/rm/TestTaskScheduler.java       | 36 +++++++++++++----
 3 files changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0ce51cc6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5565d47..34e62c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
   TEZ-2097. TEZ-UI Add dag logs backend support
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
@@ -269,7 +270,11 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+<<<<<<< HEAD
   TEZ-2097. TEZ-UI Add dag logs backend support
+=======
+  TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
+>>>>>>> TEZ-2816. Preemption sometimes does not respect heartbeats between
preemptions (bikas)
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2809. Minimal distribution compiled on 2.6 fails to run on 2.7
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
@@ -491,6 +496,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
   down an AM.

http://git-wip-us.apache.org/repos/asf/tez/blob/0ce51cc6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index f35a45f..34684f9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -910,8 +910,12 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           " taskAllocations: " + taskAllocations.size());
     }
 
-    numHeartbeats++;
-    preemptIfNeeded();
+    synchronized (this) {
+      numHeartbeats++;
+      if (preemptIfNeeded()) {
+        heartbeatAtLastPreemption = numHeartbeats;
+      }
+    }
 
     return appClientDelegate.getProgress();
   }
@@ -1141,10 +1145,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption;
   }
   
-  void preemptIfNeeded() {
+  boolean preemptIfNeeded() {
     if (preemptionPercentage == 0) {
       // turned off
-      return;
+      return true;
     }
     ContainerId[] preemptedContainers = null;
     int numPendingRequestsToService = 0;
@@ -1176,7 +1180,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       
       if (highestPriRequest == null) {
         // nothing pending
-        return;
+        return true;
       }
       
       if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
@@ -1187,7 +1191,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             LOG.info(highestPriRequest + " fits in free resources");
           }
         }
-        return;
+        return true;
       }
       // highest priority request will not fit in existing free resources
       // free up some more
@@ -1197,7 +1201,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           preemptionPercentage);
 
       if (numPendingRequestsToService < 1) {
-        return;
+        // nothing to preempt - reset preemption last heartbeat
+        return true;
       }
 
       if (LOG.isDebugEnabled()) {
@@ -1205,7 +1210,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             + numHighestPriRequests + " pending requests at pri: "
             + highestPriRequest.getPriority());
       }
-      
+
       for (int i=0; i<numPendingRequestsToService; ++i) {
         // This request must have been considered for matching with all existing 
         // containers when request was made.
@@ -1218,7 +1223,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
               LOG.debug("Reused container exists. Wait for assignment loop to release it.
"
                   + heldContainer.getContainer().getId());
             }
-            return;
+            return true;
           }
           if (heldContainer.geNumAssignmentAttempts() < 3) {
             // we havent tried to assign this container at node/rack/ANY
@@ -1226,7 +1231,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
               LOG.debug("Brand new container. Wait for assignment loop to match it. "
                   + heldContainer.getContainer().getId());
             }
-            return;
+            return true;
           }
           Container container = heldContainer.getContainer();
           if (lowestPriNewContainer == null ||
@@ -1271,18 +1276,14 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       }
       
       if (numPendingRequestsToService < 1) {
-        return;
+        return true;
       }
 
       // there are no reused or new containers to release. try to preempt running containers
       // this assert will be a no-op in production but can help identify 
       // invalid assumptions during testing
       assert delayedContainerManager.delayedContainers.isEmpty();
-      
-      if ((numHeartbeats - heartbeatAtLastPreemption) <= numHeartbeatsBetweenPreemptions)
{
-        return;
-      }
-        
+              
       Priority preemptedTaskPriority = null;
       int numEntriesAtPreemptedPriority = 0;
       for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
@@ -1318,7 +1319,12 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
             numPendingRequestsToService);
         if (numPendingRequestsToService < 1) {
-          return;
+          return true;
+        }
+        // wait for enough heartbeats since this request became active for preemption
+        if ((numHeartbeats - heartbeatAtLastPreemption) < numHeartbeatsBetweenPreemptions)
{
+          // stop incrementing lastpreemption heartbeat count
+          return false;
         }
         LOG.info("Trying to service " + numPendingRequestsToService + " out of total "
             + numHighestPriRequests + " pending requests at pri: "
@@ -1345,7 +1351,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     
     // upcall outside locks
     if (preemptedContainers != null) {
-      heartbeatAtLastPreemption = numHeartbeats;
       for(int i=0; i<numPendingRequestsToService; ++i) {
         ContainerId cId = preemptedContainers[i];
         if (cId != null) {
@@ -1354,6 +1359,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         }
       }
     }
+    return true;
   }
 
   private boolean fitsIn(Resource toFit, Resource resource) {

http://git-wip-us.apache.org/repos/asf/tez/blob/0ce51cc6/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 7e2c674..7286d79 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1505,6 +1505,7 @@ public class TestTaskScheduler {
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     scheduler.init(conf);
+    conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 3);
 
     RegisterApplicationMasterResponse mockRegResponse =
                        mock(RegisterApplicationMasterResponse.class);
@@ -1680,7 +1681,10 @@ public class TestTaskScheduler {
     scheduler.getProgress();
     drainableAppCallback.drain();
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
 
+    // add a pending request that cannot be allocated until resources free up
     Object mockTask3WaitCookie = new Object();
     scheduler.allocateTask(mockTask3Wait, taskAsk, null,
                            null, pri6, obj3, mockTask3WaitCookie);
@@ -1699,6 +1703,7 @@ public class TestTaskScheduler {
     containers.clear();
     containers.add(mockContainer4);
     
+    // new lower pri container added that wont be matched and eventually preempted
     // Fudge new container being present in delayed allocation list due to race
     HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null,
         containerSignatureMatcher);
@@ -1707,6 +1712,9 @@ public class TestTaskScheduler {
     scheduler.getProgress();
     drainableAppCallback.drain();
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+
     heldContainer.incrementAssignmentAttempts();
     // no preemption - container assignment attempts < 3
     scheduler.getProgress();
@@ -1728,12 +1736,18 @@ public class TestTaskScheduler {
     
     // remove fudging.
     scheduler.delayedContainerManager.delayedContainers.clear();
-    
+
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+
     scheduler.allocateTask(mockTask3Retry, taskAsk, null,
                            null, pri5, obj3, null);
     // no preemption - higher pri. exact match
     scheduler.getProgress();
+    // no need for task preemption until now - so they should match
     drainableAppCallback.drain();
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
     verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
 
     for (int i=0; i<11; ++i) {
@@ -1746,16 +1760,22 @@ public class TestTaskScheduler {
     // this is also a higher priority container than the pending task priority but was running
a 
     // lower priority task. Task priority is relevant for preemption and not container priority
as
     // containers can run tasks of different priorities
-    scheduler.getProgress();
+    scheduler.getProgress(); // first heartbeat
+    Assert.assertTrue(scheduler.numHeartbeats > scheduler.heartbeatAtLastPreemption);
+    drainableAppCallback.drain();
+    scheduler.getProgress(); // second heartbeat
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+    scheduler.getProgress(); // third heartbeat
     drainableAppCallback.drain();
     verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
     verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3B);
-    // next 3 heartbeats do nothing, waiting for the RM to act on the last released resources
-    scheduler.getProgress();
-    scheduler.getProgress();
-    scheduler.getProgress();
-    verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
-    scheduler.getProgress();
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+    // there are pending preemptions.
+    scheduler.getProgress(); // first heartbeat
+    scheduler.getProgress(); // second heartbeat
+    verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId) any());
+    scheduler.getProgress(); // third heartbeat
     drainableAppCallback.drain();
     // Next oldest mockTaskPri3KillA gets preempted to clear 10% of outstanding running preemptable
tasks
     verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId)any());


Mime
View raw message