hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject svn commit: r1348846 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/jav...
Date Mon, 11 Jun 2012 13:54:47 GMT
Author: tgraves
Date: Mon Jun 11 13:54:46 2012
New Revision: 1348846

URL: http://svn.apache.org/viewvc?rev=1348846&view=rev
Log:
MAPREDUCE-3927. Shuffle hang when set map.failures.percent (Bhallamudi Venkata Siva Kamesh
via tgraves)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1348846&r1=1348845&r2=1348846&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jun 11 13:54:46 2012
@@ -570,6 +570,9 @@ Release 0.23.3 - UNRELEASED
 
     MAPREDUCE-3842. Stop webpages from automatic refreshing (tgraves)
 
+    MAPREDUCE-3927. Shuffle hang when set map.failures.percent
+    (Bhallamudi Venkata Siva Kamesh via tgraves)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1348846&r1=1348845&r2=1348846&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
Mon Jun 11 13:54:46 2012
@@ -590,9 +590,9 @@ public class JobImpl implements org.apac
       float reduceProgress = 0f;
       for (Task task : this.tasks.values()) {
         if (task.getType() == TaskType.MAP) {
-          mapProgress += task.getProgress();
+          mapProgress += (task.isFinished() ? 1f : task.getProgress());
         } else {
-          reduceProgress += task.getProgress();
+          reduceProgress += (task.isFinished() ? 1f : task.getProgress());
         }
       }
       if (this.numMapTasks != 0) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1348846&r1=1348845&r2=1348846&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
Mon Jun 11 13:54:46 2012
@@ -137,24 +137,26 @@ class ShuffleScheduler<K,V> {
 
       // update the status
       totalBytesShuffledTillNow += bytes;
-      float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
-      int mapsDone = totalMaps - remainingMaps;
-      long secsSinceStart = 
-        (System.currentTimeMillis()-startTime)/1000+1;
-
-      float transferRate = mbs/secsSinceStart;
-      progress.set((float) mapsDone / totalMaps);
-      String statusString = mapsDone + " / " + totalMaps + " copied.";
-      status.setStateString(statusString);
-      progress.setStatus("copy(" + mapsDone + " of " + totalMaps 
-          + " at " +
-          mbpsFormat.format(transferRate) +  " MB/s)");
-      
+      updateStatus();
       reduceShuffleBytes.increment(bytes);
       lastProgressTime = System.currentTimeMillis();
-      LOG.debug("map " + mapId + " done " + statusString);
+      LOG.debug("map " + mapId + " done " + status.getStateString());
     }
   }
+  
+  private void updateStatus() {
+    float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
+    int mapsDone = totalMaps - remainingMaps;
+    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+    float transferRate = mbs / secsSinceStart;
+    progress.set((float) mapsDone / totalMaps);
+    String statusString = mapsDone + " / " + totalMaps + " copied.";
+    status.setStateString(statusString);
+
+    progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
+        + mbpsFormat.format(transferRate) + " MB/s)");
+  }
 
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
                                       boolean readError) {
@@ -256,7 +258,13 @@ class ShuffleScheduler<K,V> {
   }
   
   public synchronized void tipFailed(TaskID taskId) {
-    finishedMaps[taskId.getId()] = true;
+    if (!finishedMaps[taskId.getId()]) {
+      finishedMaps[taskId.getId()] = true;
+      if (--remainingMaps == 0) {
+        notifyAll();
+      }
+      updateStatus();
+    }
   }
   
   public synchronized void addKnownMapOutput(String hostName, 

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java?rev=1348846&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
(added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
Mon Jun 11 13:54:46 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestShuffleScheduler {
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testTipFailed() throws Exception {
+    JobConf job = new JobConf();
+    job.setNumMapTasks(2);
+
+    TaskStatus status = new TaskStatus() {
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+    };
+    Progress progress = new Progress();
+
+    ShuffleScheduler scheduler = new ShuffleScheduler(job, status, null,
+        progress, null, null, null);
+
+    JobID jobId = new JobID();
+    TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
+    scheduler.tipFailed(taskId1);
+
+    Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
+        0.0f);
+    Assert.assertFalse(scheduler.waitUntilDone(1));
+
+    TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
+    scheduler.tipFailed(taskId0);
+    Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
+        0.0f);
+    Assert.assertTrue(scheduler.waitUntilDone(1));
+  }
+}



Mime
View raw message