hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject hadoop git commit: MAPREDUCE-6514. Fixed MapReduce ApplicationMaster to properly updated resources ask after ramping down of all reducers avoiding job hangs. Contributed by Varun Saxena and Wangda Tan. Updated the 2.7 branch patch to fix a minor issue in
Date Fri, 06 May 2016 03:43:34 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 916f94913 -> ab4fad0d8


MAPREDUCE-6514. Fixed MapReduce ApplicationMaster to properly updated resources ask after
ramping down of all reducers avoiding job hangs. Contributed by Varun Saxena and Wangda Tan.
Updated the 2.7 branch patch to fix a minor issue in TestRMContainerAllocator.java and a minor
fix in the 2.6 patch.

(cherry picked from commit 8d48266720dcf0e71cfd87fef18b60a53aa1bef9)
(cherry picked from commit 9f51a3f430e4a8efc54193af4fe9fc1509f9980f)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab4fad0d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab4fad0d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab4fad0d

Branch: refs/heads/branch-2.6
Commit: ab4fad0d82ff96508af4afb6ad58e79c71afa454
Parents: 916f949
Author: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Authored: Thu May 5 19:01:52 2016 -0700
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Thu May 5 20:43:07 2016 -0700

----------------------------------------------------------------------
 .../v2/app/rm/RMContainerAllocator.java         |  28 +++--
 .../v2/app/rm/RMContainerRequestor.java         |   6 +
 .../v2/app/rm/TestRMContainerAllocator.java     | 124 +++++++++++++++++++
 3 files changed, 147 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab4fad0d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 6b62af4..f53e1f8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -98,7 +98,7 @@ public class RMContainerAllocator extends RMContainerRequestor
   float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
   
   private static final Priority PRIORITY_FAST_FAIL_MAP;
-  private static final Priority PRIORITY_REDUCE;
+  static final Priority PRIORITY_REDUCE;
   private static final Priority PRIORITY_MAP;
 
   @VisibleForTesting
@@ -501,12 +501,7 @@ public class RMContainerAllocator extends RMContainerRequestor
   }
 
   private void clearAllPendingReduceRequests() {
-    LOG.info("Ramping down all scheduled reduces:"
-        + scheduledRequests.reduces.size());
-    for (ContainerRequest req : scheduledRequests.reduces.values()) {
-      pendingReduces.add(req);
-    }
-    scheduledRequests.reduces.clear();
+    rampDownReduces(Integer.MAX_VALUE);
   }
 
   private void preemptReducer(int hangingMapRequests) {
@@ -678,9 +673,13 @@ public class RMContainerAllocator extends RMContainerRequestor
   @Private
   public void rampDownReduces(int rampDown) {
     //remove from the scheduled and move back to pending
-    for (int i = 0; i < rampDown; i++) {
+    while (rampDown > 0) {
       ContainerRequest request = scheduledRequests.removeReduce();
+      if (request == null) {
+        return;
+      }
       pendingReduces.add(request);
+      rampDown--;
     }
   }
   
@@ -860,6 +859,11 @@ public class RMContainerAllocator extends RMContainerRequestor
       Resources.add(assignedMapResource, assignedReduceResource));
   }
 
+  @VisibleForTesting
+  public int getNumOfPendingReduces() {
+    return pendingReduces.size();
+  }
+
   @Private
   @VisibleForTesting
   class ScheduledRequests {
@@ -875,8 +879,9 @@ public class RMContainerAllocator extends RMContainerRequestor
     @VisibleForTesting
     final Map<TaskAttemptId, ContainerRequest> maps =
       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
-    
-    private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces = 
+
+    @VisibleForTesting
+    final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
     
     boolean remove(TaskAttemptId tId) {
@@ -1271,7 +1276,8 @@ public class RMContainerAllocator extends RMContainerRequestor
   class AssignedRequests {
     private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
       new HashMap<ContainerId, TaskAttemptId>();
-    private final LinkedHashMap<TaskAttemptId, Container> maps = 
+    @VisibleForTesting
+    final LinkedHashMap<TaskAttemptId, Container> maps =
       new LinkedHashMap<TaskAttemptId, Container>();
     @VisibleForTesting
     final LinkedHashMap<TaskAttemptId, Container> reduces =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab4fad0d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index 9ce245f..3b691f8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -494,4 +494,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
   public Set<String> getBlacklistedNodes() {
     return blacklistedNodes;
   }
+
+  @Private
+  @VisibleForTesting
+  Set<ResourceRequest> getAsk() {
+    return ask;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab4fad0d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index ce101c5..af585a3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -1680,6 +1680,8 @@ public class TestRMContainerAllocator {
       when(context.getApplicationID()).thenReturn(appId);
       when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
       when(context.getJob(isA(JobId.class))).thenReturn(job);
+      when(context.getClock()).thenReturn(
+        new ControlledClock(new SystemClock()));
       when(context.getClusterInfo()).thenReturn(
         new ClusterInfo(Resource.newInstance(10240, 1)));
       when(context.getEventHandler()).thenReturn(new EventHandler() {
@@ -2456,6 +2458,128 @@ public class TestRMContainerAllocator {
         new Text(rmAddr), ugiToken.getService());
   }
 
+  @Test
+  public void testUpdateAskOnRampDownAllReduces() throws Exception {
+    LOG.info("Running testUpdateAskOnRampDownAllReduces");
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+    // Use a controlled clock to advance time for test.
+    ControlledClock clock = (ControlledClock)allocator.getContext().getClock();
+    clock.setTime(System.currentTimeMillis());
+
+    // Register nodes to RM.
+    MockNM nodeManager = rm.registerNode("h1:1234", 1024);
+    dispatcher.await();
+
+    // Request 2 maps and 1 reducer(sone on nodes which are not registered).
+    ContainerRequestEvent event1 =
+        createReq(jobId, 1, 1024, new String[] { "h1" });
+    allocator.sendRequest(event1);
+    ContainerRequestEvent event2 =
+        createReq(jobId, 2, 1024, new String[] { "h2" });
+    allocator.sendRequest(event2);
+    ContainerRequestEvent event3 =
+        createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
+    allocator.sendRequest(event3);
+
+    // This will tell the scheduler about the requests but there will be no
+    // allocations as nodes are not added.
+    allocator.schedule();
+    dispatcher.await();
+
+    // Advance clock so that maps can be considered as hanging.
+    clock.setTime(System.currentTimeMillis() + 500000L);
+
+    // Request for another reducer on h3 which has not registered.
+    ContainerRequestEvent event4 =
+        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+    allocator.sendRequest(event4);
+
+    allocator.schedule();
+    dispatcher.await();
+
+    // Update resources in scheduler through node heartbeat from h1.
+    nodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
+    allocator.schedule();
+    dispatcher.await();
+
+    // One map is assigned.
+    Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
+    // Send deallocate request for map so that no maps are assigned after this.
+    ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false);
+    allocator.sendDeallocate(deallocate);
+    // Now one reducer should be scheduled and one should be pending.
+    Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size());
+    Assert.assertEquals(1, allocator.getNumOfPendingReduces());
+    // No map should be assigned and one should be scheduled.
+    Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
+    Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
+
+    Assert.assertEquals(6, allocator.getAsk().size());
+    for (ResourceRequest req : allocator.getAsk()) {
+      boolean isReduce =
+          req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
+      if (isReduce) {
+        // 1 reducer each asked on h2, * and default-rack
+        Assert.assertTrue((req.getResourceName().equals("*") ||
+            req.getResourceName().equals("/default-rack") ||
+            req.getResourceName().equals("h2")) && req.getNumContainers() == 1);
+      } else { //map
+        // 0 mappers asked on h1 and 1 each on * and default-rack
+        Assert.assertTrue(((req.getResourceName().equals("*") ||
+            req.getResourceName().equals("/default-rack")) &&
+            req.getNumContainers() == 1) || (req.getResourceName().equals("h1")
+            && req.getNumContainers() == 0));
+      }
+    }
+    // On next allocate request to scheduler, headroom reported will be 0.
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0));
+    allocator.schedule();
+    dispatcher.await();
+    // After allocate response from scheduler, all scheduled reduces are ramped
+    // down and move to pending. 3 asks are also updated with 0 containers to
+    // indicate ramping down of reduces to scheduler.
+    Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
+    Assert.assertEquals(2, allocator.getNumOfPendingReduces());
+    Assert.assertEquals(3, allocator.getAsk().size());
+    for (ResourceRequest req : allocator.getAsk()) {
+      Assert.assertEquals(
+          RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
+      Assert.assertTrue(req.getResourceName().equals("*") ||
+          req.getResourceName().equals("/default-rack") ||
+          req.getResourceName().equals("h2"));
+      Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability());
+      Assert.assertEquals(0, req.getNumContainers());
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message