tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [14/35] tez git commit: TEZ-2192. Relocalization does not check for source. (hitesh)
Date Tue, 07 Apr 2015 20:12:32 GMT
TEZ-2192. Relocalization does not check for source. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7aeebfe7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7aeebfe7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7aeebfe7

Branch: refs/heads/TEZ-2003
Commit: 7aeebfe7cd07786d2040b568a924fa58783c7ba0
Parents: 26518d5
Author: Hitesh Shah <hitesh@apache.org>
Authored: Mon Apr 6 14:38:03 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Mon Apr 6 14:38:03 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/ContainerContext.java    |  73 ++++++++
 .../dag/app/rm/YarnTaskSchedulerService.java    |  41 +++--
 .../rm/container/ContainerContextMatcher.java   |   7 +
 .../rm/container/ContainerSignatureMatcher.java |  12 ++
 .../tez/dag/app/rm/TestContainerReuse.java      | 167 ++++++++++++++++++-
 .../tez/dag/app/rm/TestTaskScheduler.java       |  21 ++-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  10 ++
 8 files changed, 308 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ee0ef70..be2e617 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -263,6 +263,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2192. Relocalization does not check for source.
   TEZ-2224. EventQueue empty doesn't mean events are consumed in RecoveryService
   TEZ-2240. Fix toUpperCase/toLowerCase to use Locale.ENGLISH.
   TEZ-2238. TestContainerReuse flaky

http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
index e7f1a10..f00b27b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
@@ -23,6 +23,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import javax.annotation.Nullable;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.Credentials;
@@ -142,6 +145,11 @@ public class ContainerContext {
     for (Entry<String, LocalResource> additionalLREntry : reqLRsCopy.entrySet()) {
       LocalResource lr = additionalLREntry.getValue();
       if (EnumSet.of(LocalResourceType.ARCHIVE, LocalResourceType.PATTERN).contains(lr.getType()))
{
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cannot match container: Additional local resource needed is not of type
FILE"
+              + ", resourceName: " + additionalLREntry.getKey()
+              + ", resourceDetails: " + additionalLREntry);
+        }
         return false;
       }
     }
@@ -177,4 +185,69 @@ public class ContainerContext {
     }
     return true;
   }
+
+  /**
+   * Create a new ContainerContext to account for container re-use. On re-use, there is
+   * re-localization of additional LocalResources. Also, a task from a different vertex could
be
+   * run on the given container.
+   *
+   * Only a merge of local resources is needed as:
+   *
+   * credentials are modified at run-time based on the task spec.
+   * the environment for a container cannot be changed. A re-used container is always
+   * expected to have a super-set.
+   * javaOpts have to be identical for re-use.
+   *
+   * Vertex should be overridden to account for the new task being scheduled to run on this
+   * container context.
+   *
+   * @param c1 ContainerContext 1 Original task's context
+   * @param c2 ContainerContext 2 Newly assigned task's context
+   * @return Merged ContainerContext
+   */
+  public static ContainerContext union(ContainerContext c1, ContainerContext c2) {
+    HashMap<String, LocalResource> mergedLR = new HashMap<String, LocalResource>();
+    mergedLR.putAll(c1.getLocalResources());
+    mergedLR.putAll(c2.getLocalResources());
+    ContainerContext union = new ContainerContext(mergedLR, c1.credentials, c1.environment,
+        c1.javaOpts, c2.vertex);
+    return union;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+
+    sb.append("LocalResources: [");
+    if (localResources != null) {
+      for (Map.Entry<String, LocalResource> lr : localResources.entrySet()) {
+        sb.append("[ name=")
+            .append(lr.getKey())
+            .append(", value=")
+            .append(lr.getValue())
+            .append("],");
+      }
+    }
+    sb.append("], environment: [");
+    if (environment != null) {
+      for (Map.Entry<String, String> entry : environment.entrySet()) {
+        sb.append("[ ").append(entry.getKey()).append("=").append(entry.getValue())
+            .append(" ],");
+      }
+    }
+    sb.append("], credentials(token kinds): [");
+    if (credentials != null) {
+      for (Token<? extends TokenIdentifier> t : credentials.getAllTokens()) {
+        sb.append(t.getKind().toString())
+            .append(",");
+      }
+    }
+    sb.append("], javaOpts: ")
+      .append(javaOpts)
+      .append(", vertex: ")
+      .append(( vertex == null ? "null" : vertex.getLogIdentifier()));
+
+    return sb.toString();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index ff90e5d..66a6f33 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -1386,7 +1386,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             + " containerId=" + heldContainer.container.getId());
       }
       if (containerSignatureMatcher.isSuperSet(heldContainer
-          .getFirstContainerSignature(), cookieContainerRequest.getCookie()
+          .getLastAssignedContainerSignature(), cookieContainerRequest.getCookie()
           .getContainerSignature())) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Matched delayed container to task"
@@ -1441,7 +1441,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     HeldContainer heldContainer = heldContainers.get(container.getId()); 
     if (!shouldReuseContainers && heldContainer == null) {
       heldContainers.put(container.getId(), new HeldContainer(container,
-        -1, -1, assigned));
+        -1, -1, assigned, this.containerSignatureMatcher));
       Resources.addTo(allocatedResources, container.getResource());
     } else {
       if (heldContainer.isNew()) {
@@ -1451,7 +1451,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         // think about preferring within vertex matching etc.
         heldContainers.put(container.getId(),
             new HeldContainer(container, heldContainer.getNextScheduleTime(),
-                heldContainer.getContainerExpiryTime(), assigned));
+                heldContainer.getContainerExpiryTime(), assigned, this.containerSignatureMatcher));
       }
       heldContainer.setLastTaskInfo(assigned);
     }
@@ -1463,7 +1463,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     synchronized (delayedContainerManager) {
       for (Container container : containers) {
         if (heldContainers.put(container.getId(), new HeldContainer(container,
-            -1, expireTime, null)) != null) {
+            -1, expireTime, null, this.containerSignatureMatcher)) != null) {
           throw new TezUncheckedException("New container " + container.getId()
               + " is already held.");
         }
@@ -2116,33 +2116,36 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       NON_LOCAL
     }
 
-    Container container;
-    private String rack;
+    final Container container;
+    final private String rack;
     private long nextScheduleTime;
-    private Object firstContainerSignature;
     private LocalityMatchLevel localityMatchLevel;
     private long containerExpiryTime;
     private CookieContainerRequest lastTaskInfo;
     private int numAssignmentAttempts = 0;
+    private Object lastAssignedContainerSignature;
+    final ContainerSignatureMatcher signatureMatcher;
     
     HeldContainer(Container container,
         long nextScheduleTime,
         long containerExpiryTime,
-        CookieContainerRequest firstTaskInfo) {
+        CookieContainerRequest firstTaskInfo,
+        ContainerSignatureMatcher signatureMatcher) {
       this.container = container;
       this.nextScheduleTime = nextScheduleTime;
       if (firstTaskInfo != null) {
         this.lastTaskInfo = firstTaskInfo;
-        this.firstContainerSignature = firstTaskInfo.getCookie().getContainerSignature();
+        this.lastAssignedContainerSignature = firstTaskInfo.getCookie().getContainerSignature();
       }
       this.localityMatchLevel = LocalityMatchLevel.NODE;
       this.containerExpiryTime = containerExpiryTime;
       this.rack = RackResolver.resolve(container.getNodeId().getHost())
           .getNetworkLocation();
+      this.signatureMatcher = signatureMatcher;
     }
     
     boolean isNew() {
-      return firstContainerSignature == null;
+      return lastTaskInfo == null;
     }
     
     String getRack() {
@@ -2181,15 +2184,24 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       this.containerExpiryTime = containerExpiryTime;
     }
 
-    public Object getFirstContainerSignature() {
-      return this.firstContainerSignature;
+    public Object getLastAssignedContainerSignature() {
+      return this.lastAssignedContainerSignature;
     }
-    
+
     public CookieContainerRequest getLastTaskInfo() {
       return this.lastTaskInfo;
     }
     
     public void setLastTaskInfo(CookieContainerRequest taskInfo) {
+      // Merge the container signatures to account for any changes to the container
+      // footprint. For example, re-localization of additional resources will
+      // cause the held container's signature to change.
+      lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature();
+      if (lastTaskInfo != null && lastTaskInfo.getCookie().getContainerSignature()
!= null) {
+        lastAssignedContainerSignature = signatureMatcher.union(
+            lastTaskInfo.getCookie().getContainerSignature(),
+            taskInfo.getCookie().getContainerSignature());
+      }
       lastTaskInfo = taskInfo;
     }
 
@@ -2220,7 +2232,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           + ", nextScheduleTime: " + nextScheduleTime
           + ", localityMatchLevel=" + localityMatchLevel
           + ", signature: "
-          + (firstContainerSignature != null? firstContainerSignature.toString():"null");
+          + (lastAssignedContainerSignature != null? lastAssignedContainerSignature.toString()
+            : "null");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
index 6d65a67..211c537 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
@@ -70,4 +70,11 @@ public class ContainerContextMatcher implements ContainerSignatureMatcher
{
     }
     return c2LocalResources;
   }
+
+  @Override
+  public Object union(Object cs1, Object cs2) {
+    checkArguments(cs1, cs2);
+    return ContainerContext.union((ContainerContext) cs1, (ContainerContext) cs2);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
index df17564..0f9c2d6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
@@ -45,4 +45,16 @@ public interface ContainerSignatureMatcher {
    */
   public Map<String, LocalResource> getAdditionalResources(Map<String, LocalResource>
lr1,
       Map<String, LocalResource> lr2);
+
+
+  /**
+   * Do a union of 2 signatures
+   * Pre-condition. This function should only be invoked iff cs1 is compatible with cs2.
+   * i.e. isSuperSet should not return false.
+   * @param cs1 Signature 1 Original signature
+   * @param cs2 Signature 2 New signature
+   * @return Union of 2 signatures
+   */
+  public Object union(Object cs1, Object cs2);
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index c70003b..89b77a7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1028,7 +1028,7 @@ public class TestContainerReuse {
     LocalResource lr1 = mock(LocalResource.class);
     LocalResource lr2 = mock(LocalResource.class);
     LocalResource lr3 = mock(LocalResource.class);
-    
+
     AMContainerEventAssignTA assignEvent = null;
     
     Map<String, LocalResource> dag1LRs = Maps.newHashMap();
@@ -1091,7 +1091,7 @@ public class TestContainerReuse {
     Map<String, LocalResource> dag2LRs = Maps.newHashMap();
     dag2LRs.put(rsrc2, lr2);
     dag2LRs.put(rsrc3, lr3);
-    
+
     TezVertexID vertexID21 = TezVertexID.getInstance(dagID2, 1);
     
     //Vertex 2, Task 1, Attempt 1, host1, lr2
@@ -1132,6 +1132,169 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.close();
   }
 
+  @Test(timeout = 30000l)
+  public void testReuseConflictLocalResources() throws IOException, InterruptedException,
ExecutionException {
+    LOG.info("Test testReuseLocalResourcesChanged");
+    Configuration tezConf = new Configuration(new YarnConfiguration());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
true);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1);
+
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+
+    CapturingEventHandler eventHandler = new CapturingEventHandler();
+    TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0);
+
+    AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
+    TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore,
100));
+    String appUrl = "url";
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+
+    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
+
+    AppContext appContext = mock(AppContext.class);
+    doReturn(new Configuration(false)).when(appContext).getAMConf();
+    ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1);
+    AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
+        mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+    doReturn(true).when(appContext).isSession();
+    doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID();
+    doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
+        new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+            new ContainerContextMatcher());
+    TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
+    taskSchedulerEventHandler.init(tezConf);
+    taskSchedulerEventHandler.start();
+
+    TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback)
((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+        .getSpyTaskScheduler();
+    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+    Resource resource1 = Resource.newInstance(1024, 1);
+    String[] host1 = {"host1"};
+
+    String []racks = {"/default-rack"};
+    Priority priority1 = Priority.newInstance(1);
+
+    String rsrc1 = "rsrc1";
+    String rsrc2 = "rsrc2";
+    LocalResource lr1 = mock(LocalResource.class);
+    LocalResource lr2 = mock(LocalResource.class);
+    LocalResource lr3 = mock(LocalResource.class);
+
+    AMContainerEventAssignTA assignEvent = null;
+
+    Map<String, LocalResource> v11LR = Maps.newHashMap();
+    v11LR.put(rsrc1, lr1);
+
+    TezVertexID vertexID11 = TezVertexID.getInstance(dagID1, 1);
+
+    //Vertex 1, Task 1, Attempt 1, host1, lr1
+    TezTaskAttemptID taID111 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11,
1), 1);
+    TaskAttempt ta111 = mock(TaskAttempt.class);
+    doReturn(taID111).when(ta111).getID();
+    doReturn("Mock for TA " + taID111.toString()).when(ta111).toString();
+    AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(taID111, ta111,
resource1, host1, racks, priority1, v11LR);
+
+    Map<String, LocalResource> v12LR = Maps.newHashMap();
+    v12LR.put(rsrc1, lr1);
+    v12LR.put(rsrc2, lr2);
+
+    //Vertex 1, Task 2, Attempt 1, host1, lr1
+    TezTaskAttemptID taID112 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11,
2), 1);
+    TaskAttempt ta112 = mock(TaskAttempt.class);
+    doReturn(taID112).when(ta112).getID();
+    doReturn("Mock for TA " + taID112.toString()).when(ta112).toString();
+    AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112,
resource1, host1, racks, priority1, v12LR);
+
+    drainNotifier.set(false);
+    taskSchedulerEventHandler.handleEvent(lrEvent11);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainNotifier.set(false);
+    taskSchedulerEventHandler.handleEvent(lrEvent12);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+
+    Container container1 = createContainer(1, "host1", resource1, priority1);
+    Container container2 = createContainer(2, "host1", resource1, priority1);
+
+    // One container allocated.
+    drainNotifier.set(false);
+    taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerEventHandler).taskAllocated(eq(ta111), any(Object.class), eq(container1));
+    assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
+    assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
+
+    // Task assigned to container completed successfully. Container should be re-used.
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(),
TaskAttemptState.SUCCEEDED));
+    drainableAppCallback.drain();
+    verify(taskScheduler).deallocateTask(eq(ta111), eq(true));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
+    assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
+    eventHandler.reset();
+
+    // Task assigned to container completed successfully.
+    // Verify reuse across hosts.
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(),
TaskAttemptState.SUCCEEDED));
+    drainableAppCallback.drain();
+    verify(taskScheduler).deallocateTask(eq(ta112), eq(true));
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    // Setup DAG2 with additional resources. Make sure the container, even without all resources,
is reused.
+    TezDAGID dagID2 = TezDAGID.getInstance("0", 2, 0);
+    dagIDAnswer.setDAGID(dagID2);
+
+    Map<String, LocalResource> v21LR = Maps.newHashMap();
+    v21LR.put(rsrc1, lr1);
+    v21LR.put(rsrc2, lr3);
+
+    TezVertexID vertexID21 = TezVertexID.getInstance(dagID2, 1);
+
+    //Vertex 2, Task 1, Attempt 1, host1, lr2
+    TezTaskAttemptID taID211 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID21,
1), 1);
+    TaskAttempt ta211 = mock(TaskAttempt.class);
+    doReturn(taID211).when(ta211).getID();
+    doReturn("Mock for TA " + taID211.toString()).when(ta211).toString();
+    AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID211, ta211,
resource1,
+        host1, racks, priority1, v21LR);
+
+    taskSchedulerEventHandler.handleEvent(lrEvent21);
+    drainableAppCallback.drain();
+
+    // TODO This is terrible, need a better way to ensure the scheduling loop has run
+    LOG.info("Sleeping to ensure that the scheduling loop runs");
+    Thread.sleep(6000l);
+    drainNotifier.set(false);
+    taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+
+    Thread.sleep(6000l);
+    verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(ta211), any(Object.class), eq(container2));
+    eventHandler.reset();
+
+    taskScheduler.close();
+    taskSchedulerEventHandler.close();
+  }
+
+
   private Container createContainer(int id, String host, Resource resource, Priority priority)
{
     ContainerId containerID = ContainerId.newInstance(
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),

http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 21bce6d..dabae67 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -74,6 +74,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallba
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher;
+import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -90,6 +91,8 @@ public class TestTaskScheduler {
   RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
 
+  static ContainerSignatureMatcher containerSignatureMatcher = new AlwaysMatchesContainerMatcher();
+
   @BeforeClass
   public static void beforeClass() {
     MockDNSToSwitchMapping.initializeMockRackResolver();
@@ -527,6 +530,7 @@ public class TestTaskScheduler {
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
0);
     // to release immediately after deallocate
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
     scheduler.init(conf);
     drainableAppCallback.drain();
 
@@ -1049,7 +1053,7 @@ public class TestTaskScheduler {
     ContainerId mockCId1 = ContainerId.newInstance(appId, 0);
     Container c1 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(c1.getNodeId().getHost()).thenReturn(""); // we are mocking directly
-    HeldContainer hc1 = Mockito.spy(new HeldContainer(c1, 0, 0, null));
+    HeldContainer hc1 = Mockito.spy(new HeldContainer(c1, 0, 0, null, containerSignatureMatcher));
     when(hc1.getNode()).thenReturn(node1Rack1);
     when(hc1.getRack()).thenReturn(rack1);
     when(c1.getId()).thenReturn(mockCId1);
@@ -1058,7 +1062,7 @@ public class TestTaskScheduler {
     ContainerId mockCId2 = ContainerId.newInstance(appId, 1);
     Container c2 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(c2.getNodeId().getHost()).thenReturn(""); // we are mocking directly
-    HeldContainer hc2 = Mockito.spy(new HeldContainer(c2, 0, 0, null));
+    HeldContainer hc2 = Mockito.spy(new HeldContainer(c2, 0, 0, null, containerSignatureMatcher));
     when(hc2.getNode()).thenReturn(node2Rack1);
     when(hc2.getRack()).thenReturn(rack1);
     when(c2.getId()).thenReturn(mockCId2);
@@ -1067,7 +1071,7 @@ public class TestTaskScheduler {
     ContainerId mockCId3 = ContainerId.newInstance(appId, 2);
     Container c3 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(c3.getNodeId().getHost()).thenReturn(""); // we are mocking directly
-    HeldContainer hc3 = Mockito.spy(new HeldContainer(c3, 0, 0, null));
+    HeldContainer hc3 = Mockito.spy(new HeldContainer(c3, 0, 0, null, containerSignatureMatcher));
     when(hc3.getNode()).thenReturn(node1Rack1);
     when(hc3.getRack()).thenReturn(rack1);
     when(c3.getId()).thenReturn(mockCId3);
@@ -1076,7 +1080,7 @@ public class TestTaskScheduler {
     ContainerId mockCId4 = ContainerId.newInstance(appId, 3);
     Container c4 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(c4.getNodeId().getHost()).thenReturn(""); // we are mocking directly
-    HeldContainer hc4 = Mockito.spy(new HeldContainer(c4, 0, 0, null));
+    HeldContainer hc4 = Mockito.spy(new HeldContainer(c4, 0, 0, null, containerSignatureMatcher));
     when(hc4.getNode()).thenReturn(node2Rack1);
     when(hc4.getRack()).thenReturn(rack1);
     when(c4.getId()).thenReturn(mockCId4);
@@ -1085,7 +1089,7 @@ public class TestTaskScheduler {
     ContainerId mockCId5 = ContainerId.newInstance(appId, 4);
     Container c5 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(c5.getNodeId().getHost()).thenReturn(""); // we are mocking directly
-    HeldContainer hc5 = Mockito.spy(new HeldContainer(c5, 0, 0, null));
+    HeldContainer hc5 = Mockito.spy(new HeldContainer(c5, 0, 0, null, containerSignatureMatcher));
     when(hc5.getNode()).thenReturn(node1Rack2);
     when(hc5.getRack()).thenReturn(rack2);
     when(c5.getId()).thenReturn(mockCId5);
@@ -1094,7 +1098,7 @@ public class TestTaskScheduler {
     ContainerId mockCId6 = ContainerId.newInstance(appId, 5);
     Container c6 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(c6.getNodeId().getHost()).thenReturn(""); // we are mocking directly
-    HeldContainer hc6 = Mockito.spy(new HeldContainer(c6, 0, 0, null));
+    HeldContainer hc6 = Mockito.spy(new HeldContainer(c6, 0, 0, null, containerSignatureMatcher));
     when(hc6.getNode()).thenReturn(node2Rack2);
     when(hc6.getRack()).thenReturn(rack2);
     when(c6.getId()).thenReturn(mockCId6);
@@ -1103,7 +1107,7 @@ public class TestTaskScheduler {
     ContainerId mockCId7 = ContainerId.newInstance(appId, 6);
     Container c7 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(c7.getNodeId().getHost()).thenReturn(""); // we are mocking directly
-    HeldContainer hc7 = Mockito.spy(new HeldContainer(c7, 0, 0, null));
+    HeldContainer hc7 = Mockito.spy(new HeldContainer(c7, 0, 0, null, containerSignatureMatcher));
     when(hc7.getNode()).thenReturn(node1Rack3);
     when(hc7.getRack()).thenReturn(rack3);
     when(c7.getId()).thenReturn(mockCId7);
@@ -1472,7 +1476,8 @@ public class TestTaskScheduler {
     containers.add(mockContainer4);
     
     // Fudge new container being present in delayed allocation list due to race
-    HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null);
+    HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null,
+        containerSignatureMatcher);
     scheduler.delayedContainerManager.delayedContainers.add(heldContainer);
     // no preemption - container assignment attempts < 3
     scheduler.getProgress();

http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index bb44889..77c98b7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -342,6 +342,11 @@ class TestTaskSchedulerHelpers {
         Map<String, LocalResource> lr2) {
       return Maps.newHashMap();
     }
+
+    @Override
+    public Object union(Object cs1, Object cs2) {
+      return cs1;
+    }
   }
   
   static class PreemptionMatcher implements ContainerSignatureMatcher {
@@ -365,6 +370,11 @@ class TestTaskSchedulerHelpers {
         Map<String, LocalResource> lr2) {
       return Maps.newHashMap();
     }
+
+    @Override
+    public Object union(Object cs1, Object cs2) {
+      return cs1;
+    }
   }
   
 


Mime
View raw message