hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject svn commit: r1362214 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...
Date Mon, 16 Jul 2012 19:13:55 GMT
Author: bobby
Date: Mon Jul 16 19:13:55 2012
New Revision: 1362214

URL: http://svn.apache.org/viewvc?rev=1362214&view=rev
Log:
svn merge -c 1362209 FIXES: MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers
to never be scheduled (Jason Lowe via bobby)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1362214&r1=1362213&r2=1362214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Jul 16 19:13:55
2012
@@ -328,6 +328,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4299. Terasort hangs with MR2 FifoScheduler (Tom White via
     bobby)
 
+    MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be
+    scheduled (Jason Lowe via bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1362214&r1=1362213&r2=1362214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
Mon Jul 16 19:13:55 2012
@@ -122,6 +122,7 @@ public class RMContainerAllocator extend
   private int containersReleased = 0;
   private int hostLocalAssigned = 0;
   private int rackLocalAssigned = 0;
+  private int lastCompletedTasks = 0;
   
   private boolean recalculateReduceSchedule = false;
   private int mapResourceReqt;//memory
@@ -205,11 +206,18 @@ public class RMContainerAllocator extend
       scheduledRequests.assign(allocatedContainers);
       LOG.info("After Assign: " + getStat());
     }
-    
+
+    int completedMaps = getJob().getCompletedMaps();
+    int completedTasks = completedMaps + getJob().getCompletedReduces();
+    if (lastCompletedTasks != completedTasks) {
+      lastCompletedTasks = completedTasks;
+      recalculateReduceSchedule = true;
+    }
+
     if (recalculateReduceSchedule) {
       preemptReducesIfNeeded();
       scheduleReduces(
-          getJob().getTotalMaps(), getJob().getCompletedMaps(),
+          getJob().getTotalMaps(), completedMaps,
           scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
           assignedRequests.maps.size(), assignedRequests.reduces.size(),
           mapResourceReqt, reduceResourceReqt,

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1362214&r1=1362213&r2=1362214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Mon Jul 16 19:13:55 2012
@@ -1310,7 +1310,63 @@ public class TestRMContainerAllocator {
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampDownReduces(anyInt());
   }
+
+  private static class RecalculateContainerAllocator extends MyContainerAllocator {
+    public boolean recalculatedReduceSchedule = false;
+
+    public RecalculateContainerAllocator(MyResourceManager rm,
+        Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
+      super(rm, conf, appAttemptId, job);
+    }
+
+    @Override
+    public void scheduleReduces(int totalMaps, int completedMaps,
+        int scheduledMaps, int scheduledReduces, int assignedMaps,
+        int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+        int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+      recalculatedReduceSchedule = true;
+    }
+  }
   
+  @Test
+  public void testCompletedTasksRecalculateSchedule() throws Exception {
+    LOG.info("Running testCompletedTasksRecalculateSchedule");
+
+    Configuration conf = new Configuration();
+    final MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job job = mock(Job.class);
+    when(job.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+    doReturn(10).when(job).getTotalMaps();
+    doReturn(10).when(job).getTotalReduces();
+    doReturn(0).when(job).getCompletedMaps();
+    RecalculateContainerAllocator allocator =
+        new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
+    allocator.schedule();
+
+    allocator.recalculatedReduceSchedule = false;
+    allocator.schedule();
+    Assert.assertFalse("Unexpected recalculate of reduce schedule",
+        allocator.recalculatedReduceSchedule);
+
+    doReturn(1).when(job).getCompletedMaps();
+    allocator.schedule();
+    Assert.assertTrue("Expected recalculate of reduce schedule",
+        allocator.recalculatedReduceSchedule);
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();
@@ -1319,6 +1375,7 @@ public class TestRMContainerAllocator {
     t.testReportedAppProgress();
     t.testReportedAppProgressWithOnlyMaps();
     t.testBlackListedNodes();
+    t.testCompletedTasksRecalculateSchedule();
   }
 
 }



Mime
View raw message