tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-330. Ignore rack-local / non-local configs for tasks which do not need locality. (sseth)
Date Thu, 22 Aug 2013 17:22:46 GMT
Updated Branches:
  refs/heads/master 9da92ac8d -> 3f8f5bead


TEZ-330. Ignore rack-local / non-local configs for tasks which do not
need locality. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3f8f5bea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3f8f5bea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3f8f5bea

Branch: refs/heads/master
Commit: 3f8f5bead6cb046c89a3cd2069ea1b0507689b44
Parents: 9da92ac
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Aug 22 10:22:24 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Aug 22 10:22:24 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/rm/TaskScheduler.java    |  69 ++++++++++-
 .../tez/dag/app/rm/TestContainerReuse.java      | 116 +++++++++++++++++--
 2 files changed, 167 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f8f5bea/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 59414df..3e93d3e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -839,7 +839,7 @@ public class TaskScheduler extends AbstractService
           + container.getResource().toString();
       if (!(pendingMap.containsKey(identifier))) {
         pendingMap.put(identifier,
-            Boolean.valueOf(hasAnyMatchingRequests(container)));
+            Boolean.valueOf(hasPendingANYRequests(container)));
       }
       if (pendingMap.get(identifier).equals(Boolean.valueOf(false))) {
         iterator.remove();
@@ -848,7 +848,59 @@ public class TaskScheduler extends AbstractService
     }
   }
 
-  private boolean hasAnyMatchingRequests(Container container) {
+  
+  /**
+   * Returns true if a node-local request exists at the specified priority level for
+   * the specified capability. NOTE: Makes an assumption that a specific
+   * priority level does not contain a mix of local/non-local requests.
+   */
+  private boolean hasPendingNodeLocalRequest(Priority priority,
+      Resource capability) {
+    String location = ResourceRequest.ANY;
+    List<? extends Collection<CookieContainerRequest>> requestsList = amRmClient
+        .getMatchingRequests(priority, location, capability);
+    if (requestsList.size() > 0) {
+      // pick first one
+      for (Collection<CookieContainerRequest> requests : requestsList) {
+        Iterator<CookieContainerRequest> iterator = requests.iterator();
+        if (iterator.hasNext()) {
+          // Check if the container can be assigned.
+          CookieContainerRequest cookieContainerRequest = iterator.next();
+          return (cookieContainerRequest.getNodes().size() > 0);
+        }
+      }
+    }
+    return false;
+  }
+  
+  /**
+   * Returns true if a node/rack local request exists at the specified priority
+   * level for the specified capability. NOTE: Makes an assumption that a
+   * specific priority level does not contain a mix of local/non-local requests.
+   */
+  private boolean hasPendingLocalRequest(Priority priority, Resource capability) {
+    String location = ResourceRequest.ANY;
+    List<? extends Collection<CookieContainerRequest>> requestsList = amRmClient
+        .getMatchingRequests(priority, location, capability);
+    if (requestsList.size() > 0) {
+      // pick first one
+      for (Collection<CookieContainerRequest> requests : requestsList) {
+        Iterator<CookieContainerRequest> iterator = requests.iterator();
+        if (iterator.hasNext()) {
+          // Check if the container can be assigned.
+          CookieContainerRequest cookieContainerRequest = iterator.next();
+          return (cookieContainerRequest.getNodes().size() > 0 || cookieContainerRequest
+              .getRacks().size() > 0);
+        }
+      }
+    }
+    return false;
+  }
+  
+  /**
+   * Returns true if there are any pending requests for the specified container.
+   */
+  private boolean hasPendingANYRequests(Container container) {
     Priority priority = container.getPriority();
     String location = ResourceRequest.ANY;
     Resource capability = container.getResource();
@@ -860,6 +912,7 @@ public class TaskScheduler extends AbstractService
     return false;
   }
 
+
   private abstract class ContainerAssigner {
     public abstract CookieContainerRequest assignAllocatedContainer(
         Container container, boolean honorLocalityFlags);
@@ -895,13 +948,14 @@ public class TaskScheduler extends AbstractService
     }
   }
 
-  // TODO TEZ-330 Ignore reuseRackLocak/reuseNonLocal in case the initial
-  // request did not have any locality information.
   private class RackLocalContainerAssigner extends ContainerAssigner {
     @Override
     public CookieContainerRequest assignAllocatedContainer(Container container,
         boolean honorLocalityFlags) {
-      if (!honorLocalityFlags || TaskScheduler.this.reuseRackLocal) {
+      if (!honorLocalityFlags
+          || TaskScheduler.this.reuseRackLocal
+          || !hasPendingNodeLocalRequest(container.getPriority(),
+              container.getResource())) {
         String location = RackResolver.resolve(container.getNodeId().getHost())
             .getNetworkLocation();
         CookieContainerRequest assigned = getMatchingRequest(container,
@@ -918,7 +972,10 @@ public class TaskScheduler extends AbstractService
     @Override
     public CookieContainerRequest assignAllocatedContainer(Container container,
         boolean honorLocalityFlags) {
-      if (!honorLocalityFlags || TaskScheduler.this.reuseNonLocal) {
+      if (!honorLocalityFlags
+          || TaskScheduler.this.reuseNonLocal
+          || !hasPendingLocalRequest(container.getPriority(),
+              container.getResource())) {
         String location = ResourceRequest.ANY;
         CookieContainerRequest assigned = getMatchingRequest(container,
             location);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3f8f5bea/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 40b4c17..057c59d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -64,7 +64,7 @@ import com.google.common.collect.Lists;
 public class TestContainerReuse {
   
 
-  @Test
+  @Test(timeout = 15000l)
   public void testDelayedReuseContainerBecomesAvailable() throws IOException, InterruptedException,
ExecutionException {
     Configuration conf = new Configuration(new YarnConfiguration());
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
@@ -103,28 +103,28 @@ public class TestContainerReuse {
     TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback)
((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
         .getSpyTaskScheduler();
     TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
-    
+
     Resource resource = Resource.newInstance(1024, 1);
     Priority priority = Priority.newInstance(5);
     String [] host1 = {"host1"};
     String [] host2 = {"host2"};
-    
+
     String [] defaultRack = {"/default-rack"};
-    
+
     TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID, 1), 1);
     TezTaskAttemptID taID21 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
     TezTaskAttemptID taID31 = new TezTaskAttemptID(new TezTaskID(vertexID, 3), 1);
     TaskAttempt ta11 = mock(TaskAttempt.class);
     TaskAttempt ta21 = mock(TaskAttempt.class);
     TaskAttempt ta31 = mock(TaskAttempt.class);
-    
+
     AMSchedulerEventTALaunchRequest lrTa11 = createLaunchRequestEvent(taID11, ta11, resource,
host1, defaultRack, priority, conf);
     AMSchedulerEventTALaunchRequest lrTa21 = createLaunchRequestEvent(taID21, ta21, resource,
host2, defaultRack, priority, conf);
     AMSchedulerEventTALaunchRequest lrTa31 = createLaunchRequestEvent(taID31, ta31, resource,
host1, defaultRack, priority, conf);
-    
+
     taskSchedulerEventHandler.handleEvent(lrTa11);
     taskSchedulerEventHandler.handleEvent(lrTa21);
-    
+
     Container containerHost1 = createContainer(1, host1[0], resource, priority);
     Container containerHost2 = createContainer(2, host2[0], resource, priority);
 
@@ -132,10 +132,10 @@ public class TestContainerReuse {
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2));
-    
+
     // Adding the event later so that task1 assigned to containerHost1 is deterministic.
     taskSchedulerEventHandler.handleEvent(lrTa31);
-   
+
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, containerHost1.getId(),
TaskAttemptState.SUCCEEDED));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
@@ -163,7 +163,7 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.close();
   }
   
-  @Test
+  @Test(timeout = 15000l)
   public void testDelayedReuseContainerNotAvailable() throws IOException, InterruptedException,
ExecutionException {
     Configuration conf = new Configuration(new YarnConfiguration());
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
@@ -260,7 +260,7 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.close();
   }
 
-  @Test
+  @Test(timeout = 10000l)
   public void testSimpleReuse() throws IOException, InterruptedException, ExecutionException
{
     Configuration tezConf = new Configuration(new YarnConfiguration());
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
@@ -385,7 +385,99 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.close();
   }
   
-  @Test
+  @Test(timeout = 10000l)
+  public void testReuseNonLocalRequest() throws IOException, InterruptedException, ExecutionException
{
+    Configuration tezConf = new Configuration(new YarnConfiguration());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
false);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_DELAY_ALLOCATION_MILLIS, 10000l);
+    RackResolver.init(tezConf);
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+
+    CapturingEventHandler eventHandler = new CapturingEventHandler();
+    TezDAGID dagID = new TezDAGID("0", 0, 0);
+
+    AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
+    AMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore,
100));
+    String appUrl = "url";
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+
+    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
+
+    AppContext appContext = mock(AppContext.class);
+    AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
mock(TaskAttemptListener.class), appContext);
+    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    doReturn(amNodeMap).when(appContext).getAllNodes();
+    doReturn(dagID).when(appContext).getCurrentDAGID();
+    doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+    TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext,
eventHandler, rmClient);
+    TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
+    taskSchedulerEventHandler.init(tezConf);
+    taskSchedulerEventHandler.start();
+
+    TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback)
((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+        .getSpyTaskScheduler();
+    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+
+    Resource resource1 = Resource.newInstance(1024, 1);
+    String [] emptyHosts = new String[0];
+    String [] emptyRacks = new String[0];
+
+    Priority priority = Priority.newInstance(3);
+
+    TezVertexID vertexID = new TezVertexID(dagID, 1);
+
+    //Vertex 1, Task 1, Attempt 1, no locality information.
+    TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID, 1), 1);
+    TaskAttempt ta11 = mock(TaskAttempt.class);
+    doReturn(vertexID).when(ta11).getVertexID();
+    AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(taID11, ta11, resource1,
emptyHosts, emptyRacks, priority, tezConf);
+
+    //Vertex1, Task2, Attempt 1,  nolocality information.
+    TezTaskAttemptID taID12 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
+    TaskAttempt ta12 = mock(TaskAttempt.class);
+    doReturn(vertexID).when(ta12).getVertexID();
+    AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID12, ta12, resource1,
emptyHosts, emptyRacks, priority, tezConf);
+    
+    // Send launch request for task 1 onle, deterministic assignment to this task.
+    taskSchedulerEventHandler.handleEvent(lrEvent11);
+
+    Container container1 = createContainer(1, "randomHost", resource1, priority);
+
+    // One container allocated.
+    taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
+
+    // Send launch request for task2 (vertex2)
+    taskSchedulerEventHandler.handleEvent(lrEvent12);
+    
+    // Task assigned to container completed successfully. Container should be
+    // assigned immediately to ta12, since there's no local requests (instead of
+    // waiting for the re-use delay)
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(),
TaskAttemptState.SUCCEEDED));
+    drainableAppCallback.drain();
+    verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1));
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    // TA12 completed.
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(),
TaskAttemptState.SUCCEEDED));
+    drainableAppCallback.drain();
+    verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+
+    taskScheduler.close();
+    taskSchedulerEventHandler.close();
+  }
+  
+  @Test(timeout = 10000l)
   public void testNoReuseAcrossVertices() throws IOException, InterruptedException, ExecutionException
{
     Configuration tezConf = new Configuration(new YarnConfiguration());
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);


Mime
View raw message