tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2834. Make Tez preemption resilient to incorrect free resource reported by YARN (bikas) (cherry picked from commit 62d9853b35f41d73cca16d2004e321195da681f4)
Date Mon, 21 Sep 2015 18:43:00 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 1e6e47a2f -> 01ad2a7ce


TEZ-2834. Make Tez preemption resilient to incorrect free resource reported by YARN (bikas)
(cherry picked from commit 62d9853b35f41d73cca16d2004e321195da681f4)

Conflicts:
	CHANGES.txt
  tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
  tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
  tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/01ad2a7c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/01ad2a7c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/01ad2a7c

Branch: refs/heads/branch-0.7
Commit: 01ad2a7ce12e88bb536af54fd9b9d483943b8338
Parents: 1e6e47a
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Sep 21 11:19:12 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Sep 21 11:42:21 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  12 +-
 .../apache/tez/dag/api/TezConfiguration.java    |  11 +
 .../dag/app/rm/YarnTaskSchedulerService.java    |  85 ++++++-
 .../tez/dag/app/rm/TestTaskScheduler.java       | 231 ++++++++++++++++++-
 .../org/apache/tez/test/TestFaultTolerance.java |  20 ++
 .../java/org/apache/tez/test/TestInput.java     |  12 +
 6 files changed, 360 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/01ad2a7c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c5f4735..0f4789c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,7 +5,9 @@ Release 0.7.1: Unreleased
 
 INCOMPATIBLE CHANGES
 
-ALL CHANGES:
+ALL CHANGES
+  TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
+  by YARN
   TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
   TEZ-2097. TEZ-UI Add dag logs backend support
   TEZ-814.  Improve heuristic for determining a task has failed outputs
@@ -269,6 +271,12 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+<<<<<<< HEAD
+=======
+  TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
+  by YARN
+  TEZ-2097. TEZ-UI Add dag logs backend support
+>>>>>>> 62d9853... TEZ-2834. Make Tez preemption resilient to incorrect
free resource reported by YARN (bikas)
   TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
   TEZ-2097. TEZ-UI Add dag logs backend support
   TEZ-814.  Improve heuristic for determining a task has failed outputs
@@ -492,6 +500,8 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
+  by YARN
   TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting

http://git-wip-us.apache.org/repos/asf/tez/blob/01ad2a7c/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index d0a76d0..13d7ea3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -825,6 +825,17 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT = 3;
 
   /**
+   * Int value. Time (in millisecs) that an unsatisfied request will wait before preempting
other
+   * resources. In rare cases, the cluster says there are enough free resources but does
not end 
+   * up getting enough on a node to actually assign it to the job. This configuration tries
to put 
+   * a deadline on such wait to prevent indefinite job hangs.
+   */
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS =
+      TEZ_AM_PREFIX + "preemption.max.wait-time-ms";
+  public static final int TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS_DEFAULT = 60*1000; // 60s
+
+  /**
    * String value to a file path.
    * The location of the Tez libraries which will be localized for DAGs.
    * This follows the following semantics

http://git-wip-us.apache.org/repos/asf/tez/blob/01ad2a7c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 34684f9..54c297b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -98,8 +98,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   private boolean reuseRackLocal;
   private boolean reuseNonLocal;
 
+  // type is linked hash map to maintain order of incoming requests
   Map<Object, CookieContainerRequest> taskRequests =
-                  new HashMap<Object, CookieContainerRequest>();
+                  new LinkedHashMap<Object, CookieContainerRequest>();
   // LinkedHashMap is need in getProgress()
   LinkedHashMap<Object, Container> taskAllocations =
                   new LinkedHashMap<Object, Container>();
@@ -146,7 +147,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   long idleContainerTimeoutMin;
   long idleContainerTimeoutMax = 0;
   int sessionNumMinHeldContainers = 0;
-  int preemptionPercentage = 0; 
+  int preemptionPercentage = 0;
+  long preemptionMaxWaitTime = 0;
+  
+  long highestWaitingRequestWaitStartTime = 0;
+  Priority highestWaitingRequestPriority = null;
   
   Set<ContainerId> sessionMinHeldContainers = Sets.newHashSet();
   
@@ -346,6 +351,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT);
     Preconditions.checkArgument(numHeartbeatsBetweenPreemptions >= 1, 
         "Heartbeats between preemptions should be >=1");
+    
+    preemptionMaxWaitTime = conf.getInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS,

+        TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS_DEFAULT);
+    Preconditions.checkArgument(preemptionMaxWaitTime >=0, "Preemption max wait time must
be >=0");
 
     delayedContainerManager = new DelayedContainerManager();
     LOG.info("YarnTaskScheduler initialized with configuration: " +
@@ -355,6 +364,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             ", reuseNonLocal: " + reuseNonLocal + 
             ", localitySchedulingDelay: " + localitySchedulingDelay +
             ", preemptionPercentage: " + preemptionPercentage +
+            ", preemptionMaxWaitTime: " + preemptionMaxWaitTime +
             ", numHeartbeatsBetweenPreemptions: " + numHeartbeatsBetweenPreemptions +
             ", idleContainerMinTimeout: " + idleContainerTimeoutMin +
             ", idleContainerMaxTimeout: " + idleContainerTimeoutMax +
@@ -1142,7 +1152,16 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       " Free: " + freeResource +
       " pendingRequests: " + taskRequests.size() +
       " delayedContainers: " + delayedContainerManager.delayedContainers.size() +
-      " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption;
+      " heartbeats: " + numHeartbeats + 
+      " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption +
+      ((highestWaitingRequestPriority != null) ? 
+      (" highestWaitingRequestWaitStartTime: " + highestWaitingRequestWaitStartTime +
+      " highestWaitingRequestPriority: " + highestWaitingRequestPriority.toString()) : "");
+  }
+  
+  private void resetHighestWaitingPriority(Priority newPri) {
+    highestWaitingRequestPriority = newPri;
+    highestWaitingRequestWaitStartTime = 0;
   }
   
   boolean preemptIfNeeded() {
@@ -1180,10 +1199,26 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       
       if (highestPriRequest == null) {
         // nothing pending
+        resetHighestWaitingPriority(null);
         return true;
       }
       
-      if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
+      // reset the wait time when waiting priority changes to prevent carry over of the value
+      if (highestWaitingRequestPriority == null ||
+          !highestPriRequest.getPriority().equals(highestWaitingRequestPriority)) {
+        resetHighestWaitingPriority(highestPriRequest.getPriority());
+      }
+      
+      long currTime = System.currentTimeMillis();
+      if (highestWaitingRequestWaitStartTime == 0) {
+        highestWaitingRequestWaitStartTime = currTime;
+      }
+
+      boolean preemptionWaitDeadlineCrossed = 
+          (currTime - highestWaitingRequestWaitStartTime) > preemptionMaxWaitTime ? true
: false;
+
+      if(!preemptionWaitDeadlineCrossed && 
+          fitsIn(highestPriRequest.getCapability(), freeResources)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug(highestPriRequest + " fits in free resources");
         } else {
@@ -1193,6 +1228,42 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         }
         return true;
       }
+      
+      if (preemptionWaitDeadlineCrossed) {
+        // check if anything lower priority is running - priority inversion
+        // this check could have been done earlier but in the common case
+        // this would be unnecessary since there are usually requests pending
+        // in the normal case without priority inversion. So do this expensive
+        // iteration now
+        boolean lowerPriRunning = false;
+        for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
+          HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
+          CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
+          Priority taskPriority = lastTaskInfo.getPriority();
+          Object signature = lastTaskInfo.getCookie().getContainerSignature();
+          if(isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
+            // lower priority task is running
+            if (containerSignatureMatcher.isExactMatch(
+                highestPriRequest.getCookie().getContainerSignature(),
+                signature)) {
+              // exact match with different priorities
+              continue;
+            }
+            lowerPriRunning = true;
+            break;
+          }
+        }
+        if (!lowerPriRunning) {
+          // nothing lower priority running
+          // normal case of many pending request without priority inversion
+          resetHighestWaitingPriority(null);
+          return true;
+        }
+        LOG.info("Preemption deadline crossed at pri: " + highestPriRequest.getPriority()
+            + " numRequests: " + numHighestPriRequests + ". "
+            + constructPreemptionPeriodicLog(freeResources));
+      }
+      
       // highest priority request will not fit in existing free resources
       // free up some more
       // TODO this is subject to error wrt RM resource normalization
@@ -1283,7 +1354,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       // this assert will be a no-op in production but can help identify 
       // invalid assumptions during testing
       assert delayedContainerManager.delayedContainers.isEmpty();
-              
+      if (!delayedContainerManager.delayedContainers.isEmpty()) {
+        LOG.warn("Expected delayed containers to be empty. "
+            + constructPreemptionPeriodicLog(freeResources));
+      }
+      
       Priority preemptedTaskPriority = null;
       int numEntriesAtPreemptedPriority = 0;
       for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/01ad2a7c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 7286d79..ff5ad44 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1531,11 +1531,13 @@ public class TestTaskScheduler {
     Object mockTask3Retry = mock(Object.class);
     Object mockTask3KillA = mock(Object.class);
     Object mockTask3KillB = mock(Object.class);
+    Object mockTaskPri8 = mock(Object.class);
     Object obj3 = new Object();
     Priority pri2 = Priority.newInstance(2);
     Priority pri4 = Priority.newInstance(4);
     Priority pri5 = Priority.newInstance(5);
     Priority pri6 = Priority.newInstance(6);
+    Priority pri8 = Priority.newInstance(8);
 
     ArgumentCaptor<CookieContainerRequest> requestCaptor =
         ArgumentCaptor.forClass(CookieContainerRequest.class);
@@ -1688,12 +1690,16 @@ public class TestTaskScheduler {
     Object mockTask3WaitCookie = new Object();
     scheduler.allocateTask(mockTask3Wait, taskAsk, null,
                            null, pri6, obj3, mockTask3WaitCookie);
+    // add a pri 8 request for the pri 8 container that will not be matched
+    Object mockTaskPri8Cookie = new Object();
+    scheduler.allocateTask(mockTaskPri8, taskAsk, null,
+                           null, pri8, obj3, mockTaskPri8Cookie);
     // no preemption - same pri
     scheduler.getProgress();
     drainableAppCallback.drain();
+    verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
     
-    Priority pri8 = Priority.newInstance(8);
     Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer4.getNodeId().getHost()).thenReturn("host1");
     when(mockContainer4.getResource()).thenReturn(taskAsk);
@@ -1727,12 +1733,12 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
     verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
     verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId4);
-    verify(mockRMClient, times(5)).
-    addContainerRequest(requestCaptor.capture());
+    // internally re-request pri8 task request because we release pri8 new container
+    verify(mockRMClient, times(7)).addContainerRequest(requestCaptor.capture());
     CookieContainerRequest reAdded = requestCaptor.getValue();
-    Assert.assertEquals(pri6, reAdded.getPriority());
+    Assert.assertEquals(pri8, reAdded.getPriority());
     Assert.assertEquals(taskAsk, reAdded.getCapability());
-    Assert.assertEquals(mockTask3WaitCookie, reAdded.getCookie().getAppCookie());
+    Assert.assertEquals(mockTaskPri8Cookie, reAdded.getCookie().getAppCookie());
     
     // remove fudging.
     scheduler.delayedContainerManager.delayedContainers.clear();
@@ -1788,6 +1794,221 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
     scheduler.close();
   }
+  
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test (timeout=5000)
+  public void testTaskSchedulerPreemption2() throws Exception {
+    RackResolver.init(new YarnConfiguration());
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+    AppContext mockAppContext = mock(AppContext.class);
+    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
+
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+                                                  mock(TezAMRMClientAsync.class);
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+    final TaskSchedulerWithDrainableAppCallback scheduler =
+      new TaskSchedulerWithDrainableAppCallback(
+        mockApp, new PreemptionMatcher(), appHost, appPort,
+        appUrl, mockRMClient, mockAppContext);
+    TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
+        .getDrainableAppCallback();
+
+    int waitTime = 1000;
+    
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 2);
+    conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS, waitTime);
+    scheduler.init(conf);
+    conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 3);
+
+    RegisterApplicationMasterResponse mockRegResponse =
+                       mock(RegisterApplicationMasterResponse.class);
+    when(
+        mockRMClient.registerApplicationMaster(anyString(), anyInt(),
+            anyString())).thenReturn(mockRegResponse);
+
+    scheduler.start();
+    Resource totalResource = Resource.newInstance(4000, 4);
+    when(mockRMClient.getAvailableResources()).thenReturn(totalResource);
+
+    // no preemption
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    Assert.assertEquals(totalResource, scheduler.getTotalResources());
+    verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+
+    // allocate task
+    Object mockTask1 = mock(Object.class);
+    Object mockTask2 = mock(Object.class);
+    Object mockTask3 = mock(Object.class);
+    Object obj3 = new Object();
+    Priority pri2 = Priority.newInstance(2);
+    Priority pri4 = Priority.newInstance(4);
+    Priority pri6 = Priority.newInstance(6);
+
+    ArgumentCaptor<CookieContainerRequest> requestCaptor =
+        ArgumentCaptor.forClass(CookieContainerRequest.class);
+    final ArrayList<CookieContainerRequest> anyContainers =
+        new ArrayList<CookieContainerRequest>();
+
+    
+    Resource taskAsk = Resource.newInstance(1024, 1);
+    scheduler.allocateTask(mockTask1, taskAsk, null,
+                           null, pri4, null, null);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(1)).
+        addContainerRequest(requestCaptor.capture());
+    anyContainers.add(requestCaptor.getValue());
+
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    Assert.assertEquals(totalResource, scheduler.getTotalResources());
+    verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+
+    final List<ArrayList<CookieContainerRequest>> anyList =
+        new LinkedList<ArrayList<CookieContainerRequest>>();
+    final List<ArrayList<CookieContainerRequest>> emptyList =
+        new LinkedList<ArrayList<CookieContainerRequest>>();
+
+    anyList.add(anyContainers);
+    List<Container> containers = new ArrayList<Container>();
+    Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
+    when(mockContainer1.getResource()).thenReturn(taskAsk);
+    when(mockContainer1.getPriority()).thenReturn(pri4);
+    ContainerId mockCId1 = mock(ContainerId.class);
+    when(mockContainer1.getId()).thenReturn(mockCId1);
+    containers.add(mockContainer1);
+    when(
+        mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>()
{
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+        });
+    // RackResolver by default puts hosts in default-rack
+    when(
+        mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"),
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>()
{
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+        });
+    when(
+        mockRMClient.getMatchingRequests((Priority) any(),
+            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>()
{
+          int calls = 0;
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            if(calls > 0) {
+              anyContainers.remove(0);
+            }
+            calls++;
+            return anyList;
+          }
+        });
+    
+    Mockito.doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          ContainerId cId = (ContainerId) args[0];
+          scheduler.deallocateContainer(cId);
+          return null;
+      }})
+    .when(mockApp).preemptContainer((ContainerId)any());
+    
+    scheduler.onContainersAllocated(containers);
+    drainableAppCallback.drain();
+    Assert.assertEquals(1, scheduler.taskAllocations.size());
+    Assert.assertEquals(mockCId1,
+        scheduler.taskAllocations.get(mockTask1).getId());
+
+    // no preemption
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+
+    // add a pending request that cannot be allocated until resources free up
+    Object mockTask2Cookie = new Object();
+    scheduler.allocateTask(mockTask2, taskAsk, null,
+                           null, pri2, obj3, mockTask2Cookie);
+    Object mockTask3Cookie = new Object();
+    scheduler.allocateTask(mockTask3, taskAsk, null,
+                           null, pri6, obj3, mockTask3Cookie);
+    // nothing waiting till now
+    Assert.assertNull(scheduler.highestWaitingRequestPriority);
+    Assert.assertEquals(0, scheduler.highestWaitingRequestWaitStartTime);
+
+    long currTime = System.currentTimeMillis();
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    // enough free resources. preemption not triggered
+    Assert.assertEquals(pri2, scheduler.highestWaitingRequestPriority);
+    Assert.assertTrue(scheduler.highestWaitingRequestWaitStartTime >= currTime);
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+    
+    Thread.sleep(waitTime + 10);
+    long oldStartWaitTime = scheduler.highestWaitingRequestWaitStartTime;
+    
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    // enough free resources. deadline crossed. preemption triggered
+    Assert.assertEquals(pri2, scheduler.highestWaitingRequestPriority);
+    Assert.assertEquals(oldStartWaitTime, scheduler.highestWaitingRequestWaitStartTime);
+    Assert.assertTrue(scheduler.numHeartbeats > scheduler.heartbeatAtLastPreemption);
+
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+    verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId1);
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+    // maintains existing waiting values
+    Assert.assertEquals(pri2, scheduler.highestWaitingRequestPriority);
+    Assert.assertEquals(oldStartWaitTime, scheduler.highestWaitingRequestWaitStartTime);
+
+    // remove high pri request to test waiting pri change
+    scheduler.deallocateTask(mockTask2, false);
+    
+    scheduler.getProgress();
+    // waiting value changes
+    Assert.assertEquals(pri6, scheduler.highestWaitingRequestPriority);
+    Assert.assertTrue(oldStartWaitTime < scheduler.highestWaitingRequestWaitStartTime);
+
+    Thread.sleep(waitTime + 10);
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    // deadlines crossed but nothing lower pri running. so reset
+    Assert.assertNull(scheduler.highestWaitingRequestPriority);
+    Assert.assertEquals(0, scheduler.highestWaitingRequestWaitStartTime);
+
+    scheduler.getProgress();
+    drainableAppCallback.drain();
+    // waiting value changes
+    Assert.assertEquals(pri6, scheduler.highestWaitingRequestPriority);
+    Assert.assertTrue(oldStartWaitTime < scheduler.highestWaitingRequestWaitStartTime);
+
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.close();
+    drainableAppCallback.drain();
+  }
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/01ad2a7c/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index d03dd18..0d27032 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -88,6 +88,8 @@ public class TestFaultTolerance {
       tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
           remoteStagingDir.toString());
       tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+      tezConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, 0.4);
+      tezConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, 3);
 
       tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
       tezSession.start();
@@ -271,6 +273,24 @@ public class TestFaultTolerance {
   }
   
   @Test (timeout=60000)
+  public void testBasicInputFailureWithoutExitDeadline() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 3); // 1 error < 0.4 fail
fraction
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "2");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+    
+    DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExitDeadline", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+
+  
+  @Test (timeout=60000)
   public void testMultipleInputFailureWithoutExit() throws Exception {
     Configuration testConf = new Configuration(false);
     testConf.setBoolean(TestInput.getVertexConfName(

http://git-wip-us.apache.org/repos/asf/tez/blob/01ad2a7c/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index eeb565c..fb42c8e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -75,6 +75,7 @@ public class TestInput extends AbstractLogicalInput {
   Set<Integer> failingInputIndices = Sets.newHashSet();
   Integer failAll = new Integer(-1);
   int[] inputValues;
+  AtomicInteger numEventsReceived = new AtomicInteger(0);
   
   /**
    * Enable failure for this logical input
@@ -192,12 +193,22 @@ public class TestInput extends AbstractLogicalInput {
                 LOG.info("Failing input: " + msg);
               }
             }
+            int numEvents = numEventsReceived.get();
             getContext().sendEvents(events);
             if (doFailAndExit) {
               String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier();
               LOG.info(msg);
               throwException(msg);
             } else {
+              try {
+                while (numEvents == numEventsReceived.get()) {
+                  // keep sending events
+                  Thread.sleep(500);
+                  getContext().sendEvents(events);
+                }
+              } catch (InterruptedException e) {
+                LOG.info("Interrupted while sending events", e);
+              }
               done = false;
             }
           } else if ((failingTaskIndices.contains(failAll) ||
@@ -330,6 +341,7 @@ public class TestInput extends AbstractLogicalInput {
   @Override
   public void handleEvents(List<Event> inputEvents) throws Exception {
     for (Event event : inputEvents) {
+      numEventsReceived.incrementAndGet();
       if (event instanceof DataMovementEvent) {
         DataMovementEvent dmEvent = (DataMovementEvent) event;
         numCompletedInputs++;


Mime
View raw message