Author: tgraves
Date: Mon Jun 11 13:54:46 2012
New Revision: 1348846
URL: http://svn.apache.org/viewvc?rev=1348846&view=rev
Log:
MAPREDUCE-3927. Shuffle hang when set map.failures.percent (Bhallamudi Venkata Siva Kamesh
via tgraves)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1348846&r1=1348845&r2=1348846&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jun 11 13:54:46 2012
@@ -570,6 +570,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-3842. Stop webpages from automatic refreshing (tgraves)
+ MAPREDUCE-3927. Shuffle hang when set map.failures.percent
+ (Bhallamudi Venkata Siva Kamesh via tgraves)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1348846&r1=1348845&r2=1348846&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
Mon Jun 11 13:54:46 2012
@@ -590,9 +590,9 @@ public class JobImpl implements org.apac
float reduceProgress = 0f;
for (Task task : this.tasks.values()) {
if (task.getType() == TaskType.MAP) {
- mapProgress += task.getProgress();
+ mapProgress += (task.isFinished() ? 1f : task.getProgress());
} else {
- reduceProgress += task.getProgress();
+ reduceProgress += (task.isFinished() ? 1f : task.getProgress());
}
}
if (this.numMapTasks != 0) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1348846&r1=1348845&r2=1348846&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
Mon Jun 11 13:54:46 2012
@@ -137,24 +137,26 @@ class ShuffleScheduler<K,V> {
// update the status
totalBytesShuffledTillNow += bytes;
- float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
- int mapsDone = totalMaps - remainingMaps;
- long secsSinceStart =
- (System.currentTimeMillis()-startTime)/1000+1;
-
- float transferRate = mbs/secsSinceStart;
- progress.set((float) mapsDone / totalMaps);
- String statusString = mapsDone + " / " + totalMaps + " copied.";
- status.setStateString(statusString);
- progress.setStatus("copy(" + mapsDone + " of " + totalMaps
- + " at " +
- mbpsFormat.format(transferRate) + " MB/s)");
-
+ updateStatus();
reduceShuffleBytes.increment(bytes);
lastProgressTime = System.currentTimeMillis();
- LOG.debug("map " + mapId + " done " + statusString);
+ LOG.debug("map " + mapId + " done " + status.getStateString());
}
}
+
+ private void updateStatus() {
+ float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
+ int mapsDone = totalMaps - remainingMaps;
+ long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+ float transferRate = mbs / secsSinceStart;
+ progress.set((float) mapsDone / totalMaps);
+ String statusString = mapsDone + " / " + totalMaps + " copied.";
+ status.setStateString(statusString);
+
+ progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
+ + mbpsFormat.format(transferRate) + " MB/s)");
+ }
public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
boolean readError) {
@@ -256,7 +258,13 @@ class ShuffleScheduler<K,V> {
}
public synchronized void tipFailed(TaskID taskId) {
- finishedMaps[taskId.getId()] = true;
+ if (!finishedMaps[taskId.getId()]) {
+ finishedMaps[taskId.getId()] = true;
+ if (--remainingMaps == 0) {
+ notifyAll();
+ }
+ updateStatus();
+ }
}
public synchronized void addKnownMapOutput(String hostName,
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java?rev=1348846&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
(added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
Mon Jun 11 13:54:46 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestShuffleScheduler {
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testTipFailed() throws Exception {
+ JobConf job = new JobConf();
+ job.setNumMapTasks(2);
+
+ TaskStatus status = new TaskStatus() {
+ @Override
+ public boolean getIsMap() {
+ return false;
+ }
+
+ @Override
+ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+ }
+ };
+ Progress progress = new Progress();
+
+ ShuffleScheduler scheduler = new ShuffleScheduler(job, status, null,
+ progress, null, null, null);
+
+ JobID jobId = new JobID();
+ TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
+ scheduler.tipFailed(taskId1);
+
+ Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
+ 0.0f);
+ Assert.assertFalse(scheduler.waitUntilDone(1));
+
+ TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
+ scheduler.tipFailed(taskId0);
+ Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
+ 0.0f);
+ Assert.assertTrue(scheduler.waitUntilDone(1));
+ }
+}
|