Return-Path:
X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org
Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id CDD35D85A
for ;
Fri, 3 Aug 2012 19:02:02 +0000 (UTC)
Received: (qmail 18752 invoked by uid 500); 3 Aug 2012 19:02:02 -0000
Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org
Received: (qmail 18640 invoked by uid 500); 3 Aug 2012 19:02:02 -0000
Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: mapreduce-dev@hadoop.apache.org
Delivered-To: mailing list mapreduce-commits@hadoop.apache.org
Received: (qmail 18315 invoked by uid 99); 3 Aug 2012 19:02:01 -0000
Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Aug 2012 19:02:01 +0000
X-ASF-Spam-Status: No, hits=-2000.0 required=5.0
tests=ALL_TRUSTED
X-Spam-Check-By: apache.org
Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Aug 2012 19:01:51 +0000
Received: from eris.apache.org (localhost [127.0.0.1])
by eris.apache.org (Postfix) with ESMTP id DA77D2388AA9;
Fri, 3 Aug 2012 19:00:59 +0000 (UTC)
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: svn commit: r1369164 [2/7] - in
/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduc...
Date: Fri, 03 Aug 2012 19:00:51 -0000
To: mapreduce-commits@hadoop.apache.org
From: gkesavan@apache.org
X-Mailer: svnmailer-1.0.8-patched
Message-Id: <20120803190059.DA77D2388AA9@eris.apache.org>
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Aug 3 19:00:15 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -47,9 +46,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -131,6 +127,7 @@ public class RMContainerAllocator extend
private int containersReleased = 0;
private int hostLocalAssigned = 0;
private int rackLocalAssigned = 0;
+ private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory
@@ -214,11 +211,18 @@ public class RMContainerAllocator extend
scheduledRequests.assign(allocatedContainers);
LOG.info("After Assign: " + getStat());
}
-
+
+ int completedMaps = getJob().getCompletedMaps();
+ int completedTasks = completedMaps + getJob().getCompletedReduces();
+ if (lastCompletedTasks != completedTasks) {
+ lastCompletedTasks = completedTasks;
+ recalculateReduceSchedule = true;
+ }
+
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
scheduleReduces(
- getJob().getTotalMaps(), getJob().getCompletedMaps(),
+ getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt,
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java Fri Aug 3 19:00:15 2012
@@ -78,14 +78,29 @@ public class ConfBlock extends HtmlBlock
tr().
th(_TH, "key").
th(_TH, "value").
+ th(_TH, "source chain").
_().
_().
tbody();
for (ConfEntryInfo entry : info.getProperties()) {
+ StringBuffer buffer = new StringBuffer();
+ String[] sources = entry.getSource();
+ //Skip the last entry, because it is always the same HDFS file, and
+ // output them in reverse order so most recent is output first
+ boolean first = true;
+ for(int i = (sources.length - 2); i >= 0; i--) {
+ if(!first) {
+ // \u2B05 is an arrow <--
+ buffer.append(" \u2B05 ");
+ }
+ first = false;
+ buffer.append(sources[i]);
+ }
tbody.
tr().
td(entry.getName()).
td(entry.getValue()).
+ td(buffer.toString()).
_();
}
tbody._().
@@ -93,6 +108,7 @@ public class ConfBlock extends HtmlBlock
tr().
th().input("search_init").$type(InputType.text).$name("key").$value("key")._()._().
th().input("search_init").$type(InputType.text).$name("value").$value("value")._()._().
+ th().input("search_init").$type(InputType.text).$name("source chain").$value("source chain")._()._().
_().
_().
_();
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java Fri Aug 3 19:00:15 2012
@@ -64,7 +64,7 @@ public class AMAttemptInfo {
if (containerId != null) {
this.containerId = containerId.toString();
this.logsLink = join("http://" + nodeHttpAddress,
- ujoin("node", "containerlogs", this.containerId));
+ ujoin("node", "containerlogs", this.containerId, user));
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java Fri Aug 3 19:00:15 2012
@@ -27,13 +27,19 @@ public class ConfEntryInfo {
protected String name;
protected String value;
+ protected String[] source;
public ConfEntryInfo() {
}
public ConfEntryInfo(String key, String value) {
+ this(key, value, null);
+ }
+
+ public ConfEntryInfo(String key, String value, String[] source) {
this.name = key;
this.value = value;
+ this.source = source;
}
public String getName() {
@@ -43,4 +49,8 @@ public class ConfEntryInfo {
public String getValue() {
return this.value;
}
+
+ public String[] getSource() {
+ return source;
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java Fri Aug 3 19:00:15 2012
@@ -46,7 +46,8 @@ public class ConfInfo {
Configuration jobConf = job.loadConfFile();
this.path = job.getConfFile().toString();
for (Map.Entry entry : jobConf) {
- this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue()));
+ this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue(),
+ jobConf.getPropertySources(entry.getKey())));
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Aug 3 19:00:15 2012
@@ -603,7 +603,7 @@ public class MockJobs extends MockApps {
public Configuration loadConfFile() throws IOException {
FileContext fc = FileContext.getFileContext(configFile.toUri(), conf);
Configuration jobConf = new Configuration(false);
- jobConf.addResource(fc.open(configFile));
+ jobConf.addResource(fc.open(configFile), configFile.toString());
return jobConf;
}
};
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Aug 3 19:00:15 2012
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -86,11 +87,13 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Test;
@@ -352,7 +355,7 @@ public class TestRMContainerAllocator {
}
@Override
protected ResourceScheduler createScheduler() {
- return new MyFifoScheduler();
+ return new MyFifoScheduler(this.getRMContext());
}
}
@@ -1091,6 +1094,19 @@ public class TestRMContainerAllocator {
}
private static class MyFifoScheduler extends FifoScheduler {
+
+ public MyFifoScheduler(RMContext rmContext) {
+ super();
+ try {
+ Configuration conf = new Configuration();
+ reinitialize(conf, new ContainerTokenSecretManager(conf),
+ rmContext);
+ } catch (IOException ie) {
+ LOG.info("add application failed with ", ie);
+ assert (false);
+ }
+ }
+
// override this to copy the objects otherwise FifoScheduler updates the
// numContainers in same objects as kept by RMContainerAllocator
@Override
@@ -1393,7 +1409,63 @@ public class TestRMContainerAllocator {
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt());
}
+
+ private static class RecalculateContainerAllocator extends MyContainerAllocator {
+ public boolean recalculatedReduceSchedule = false;
+
+ public RecalculateContainerAllocator(MyResourceManager rm,
+ Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
+ super(rm, conf, appAttemptId, job);
+ }
+
+ @Override
+ public void scheduleReduces(int totalMaps, int completedMaps,
+ int scheduledMaps, int scheduledReduces, int assignedMaps,
+ int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+ int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+ recalculatedReduceSchedule = true;
+ }
+ }
+ @Test
+ public void testCompletedTasksRecalculateSchedule() throws Exception {
+ LOG.info("Running testCompletedTasksRecalculateSchedule");
+
+ Configuration conf = new Configuration();
+ final MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job job = mock(Job.class);
+ when(job.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ doReturn(10).when(job).getTotalMaps();
+ doReturn(10).when(job).getTotalReduces();
+ doReturn(0).when(job).getCompletedMaps();
+ RecalculateContainerAllocator allocator =
+ new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
+ allocator.schedule();
+
+ allocator.recalculatedReduceSchedule = false;
+ allocator.schedule();
+ Assert.assertFalse("Unexpected recalculate of reduce schedule",
+ allocator.recalculatedReduceSchedule);
+
+ doReturn(1).when(job).getCompletedMaps();
+ allocator.schedule();
+ Assert.assertTrue("Expected recalculate of reduce schedule",
+ allocator.recalculatedReduceSchedule);
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
@@ -1402,6 +1474,7 @@ public class TestRMContainerAllocator {
t.testReportedAppProgress();
t.testReportedAppProgressWithOnlyMaps();
t.testBlackListedNodes();
+ t.testCompletedTasksRecalculateSchedule();
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Fri Aug 3 19:00:15 2012
@@ -565,6 +565,73 @@ public class TestTaskAttempt{
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
eventHandler.internalError);
}
+
+ @Test
+ public void testDoubleTooManyFetchFailure() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_DONE));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+
+ assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+ assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+ assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+ eventHandler.internalError);
+ }
public static class MockEventHandler implements EventHandler {
public boolean internalError;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Aug 3 19:00:15 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -127,8 +128,13 @@ public class TestTaskImpl {
@Override
protected int getMaxAttempts() {
return 100;
- }
-
+ }
+
+ @Override
+ protected void internalError(TaskEventType type) {
+ super.internalError(type);
+ fail("Internal error: " + type);
+ }
}
private class MockTaskAttemptImpl extends TaskAttemptImpl {
@@ -462,5 +468,32 @@ public class TestTaskImpl {
assertTaskSucceededState();
}
+
+ @Test
+ public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ updateLastAttemptState(TaskAttemptState.RUNNING);
+
+ // Add a speculative task attempt that succeeds
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ commitTaskAttempt(getLastAttempt().getAttemptId());
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+ // The task should now have succeeded
+ assertTaskSucceededState();
+
+ // Now fail the first task attempt, after the second has succeeded
+ mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
+ TaskEventType.T_ATTEMPT_FAILED));
+
+ // The task should still be in the succeeded state
+ assertTaskSucceededState();
+
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java Fri Aug 3 19:00:15 2012
@@ -972,7 +972,8 @@ public class TestAMWebServicesJobs exten
WebServicesTestUtils.checkStringMatch("containerId", amInfo
.getContainerId().toString(), containerId);
- String localLogsLink = ujoin("node", "containerlogs", containerId);
+ String localLogsLink =ujoin("node", "containerlogs", containerId,
+ job.getUserName());
assertTrue("logsLink", logsLink.contains(localLogsLink));
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Fri Aug 3 19:00:15 2012
@@ -18,12 +18,9 @@
package org.apache.hadoop.mapred;
-import com.google.common.collect.Maps;
-
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
-import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -34,6 +31,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -85,6 +84,8 @@ class LocalDistributedCacheManager {
* @throws IOException
*/
public void setup(JobConf conf) throws IOException {
+ File workDir = new File(System.getProperty("user.dir"));
+
// Generate YARN local resources objects corresponding to the distributed
// cache configuration
Map localResources =
@@ -132,7 +133,8 @@ class LocalDistributedCacheManager {
Future future = exec.submit(download);
resourcesToPaths.put(resource, future);
}
- for (LocalResource resource : localResources.values()) {
+ for (Entry entry : localResources.entrySet()) {
+ LocalResource resource = entry.getValue();
Path path;
try {
path = resourcesToPaths.get(resource).get();
@@ -142,6 +144,10 @@ class LocalDistributedCacheManager {
throw new IOException(e);
}
String pathString = path.toUri().toString();
+ String link = entry.getKey();
+ String target = new File(path.toUri()).getPath();
+ symlink(workDir, target, link);
+
if (resource.getType() == LocalResourceType.ARCHIVE) {
localArchives.add(pathString);
} else if (resource.getType() == LocalResourceType.FILE) {
@@ -175,27 +181,6 @@ class LocalDistributedCacheManager {
.arrayToString(localFiles.toArray(new String[localArchives
.size()])));
}
- if (DistributedCache.getSymlink(conf)) {
- File workDir = new File(System.getProperty("user.dir"));
- URI[] archives = DistributedCache.getCacheArchives(conf);
- URI[] files = DistributedCache.getCacheFiles(conf);
- Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
- Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
- if (archives != null) {
- for (int i = 0; i < archives.length; i++) {
- String link = archives[i].getFragment();
- String target = new File(localArchives[i].toUri()).getPath();
- symlink(workDir, target, link);
- }
- }
- if (files != null) {
- for (int i = 0; i < files.length; i++) {
- String link = files[i].getFragment();
- String target = new File(localFiles[i].toUri()).getPath();
- symlink(workDir, target, link);
- }
- }
- }
setupCalled = true;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Fri Aug 3 19:00:15 2012
@@ -383,6 +383,7 @@ public class TypeConverter {
switch (yarnApplicationState) {
case NEW:
case SUBMITTED:
+ case ACCEPTED:
return State.PREP;
case RUNNING:
return State.RUNNING;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Fri Aug 3 19:00:15 2012
@@ -133,5 +133,16 @@ public class JHAdminConfig {
* The HistoryStorage class to use to cache history data.
*/
public static final String MR_HISTORY_STORAGE =
- MR_HISTORY_PREFIX + ".store.class";
+ MR_HISTORY_PREFIX + "store.class";
+
+ /** Whether to use fixed ports with the minicluster. */
+ public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX
+ + "minicluster.fixed.ports";
+
+ /**
+ * Default is false to be able to run tests concurrently without port
+ * conflicts.
+ */
+ public static boolean DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS = false;
+
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Fri Aug 3 19:00:15 2012
@@ -171,8 +171,9 @@ public class MRApps extends Apps {
}
// Add standard Hadoop classes
- for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)
- .split(",")) {
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
.trim());
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Fri Aug 3 19:00:15 2012
@@ -117,7 +117,8 @@ public class TestMRWithDistributedCache
TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1,
symlinkFile.length());
- TestCase.assertFalse("second file should not be symlinked",
+ //This last one is a difference between MRv2 and MRv1
+ TestCase.assertTrue("second file should be symlinked too",
expectedAbsentSymlinkFile.exists());
}
}
@@ -145,7 +146,6 @@ public class TestMRWithDistributedCache
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
- job.createSymlink();
job.setMaxMapAttempts(1); // speed up failures
job.submit();
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java Fri Aug 3 19:00:15 2012
@@ -48,8 +48,12 @@ import org.apache.hadoop.mapreduce.Job;
* Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
* Jars may be optionally added to the classpath of the tasks, a rudimentary
* software distribution mechanism. Files have execution permissions.
- * Optionally users can also direct it to symlink the distributed cache file(s)
- * into the working directory of the task.
+ * In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ * to be created in the working directory of the child task. In the current
+ * version symlinks are always created. If the URL does not have a fragment
+ * the name of the file or directory will be used. If multiple files or
+ * directories map to the same link name, the last one added, will be used. All
+ * others will not even be downloaded.
*
* DistributedCache
tracks modification timestamps of the cache
* files. Clearly the cache files should not be modified by the application
@@ -91,8 +95,7 @@ import org.apache.hadoop.mapreduce.Job;
*
* public void configure(JobConf job) {
* // Get the cached archives/files
- * localArchives = DistributedCache.getLocalCacheArchives(job);
- * localFiles = DistributedCache.getLocalCacheFiles(job);
+ * File f = new File("./map.zip/some/file/in/zip.txt");
* }
*
* public void map(K key, V value,
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java Fri Aug 3 19:00:15 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -379,7 +380,8 @@ public class IFile {
private int readData(byte[] buf, int off, int len) throws IOException {
int bytesRead = 0;
while (bytesRead < len) {
- int n = in.read(buf, off+bytesRead, len-bytesRead);
+ int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+ len - bytesRead);
if (n < 0) {
return bytesRead;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java Fri Aug 3 19:00:15 2012
@@ -67,13 +67,13 @@ class IndexCache {
if (info == null) {
info = readIndexFileToCache(fileName, mapId, expectedIndexOwner);
} else {
- while (isUnderConstruction(info)) {
- try {
- // In case the entry is ready after the above check but
- // before the following wait, we do timed wait.
- info.wait(200);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted waiting for construction", e);
+ synchronized(info) {
+ while (isUnderConstruction(info)) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for construction", e);
+ }
}
}
LOG.debug("IndexCache HIT: MapId " + mapId + " found");
@@ -101,13 +101,13 @@ class IndexCache {
IndexInformation info;
IndexInformation newInd = new IndexInformation();
if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
- while (isUnderConstruction(info)) {
- try {
- // In case the entry is ready after the above check but
- // before the following wait, we do timed wait.
- info.wait(200);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted waiting for construction", e);
+ synchronized(info) {
+ while (isUnderConstruction(info)) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for construction", e);
+ }
}
}
LOG.debug("IndexCache HIT: MapId " + mapId + " found");
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java Fri Aug 3 19:00:15 2012
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -184,7 +185,7 @@ class JobQueueClient extends Configured
printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
if (showJobs && (jobQueueInfo.getChildren() == null ||
jobQueueInfo.getChildren().size() == 0)) {
- JobStatus[] jobs = jc.getJobsFromQueue(queue);
+ JobStatus[] jobs = jobQueueInfo.getJobStatuses();
if (jobs == null)
jobs = new JobStatus[0];
jc.displayJobList(jobs);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Fri Aug 3 19:00:15 2012
@@ -238,7 +238,7 @@ public class JobStatus extends org.apach
stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(),
stat.getCleanupProgress(), stat.getState().getValue(),
JobPriority.valueOf(stat.getPriority().name()),
- stat.getUsername(), stat.getJobName(), stat.getJobFile(),
+ stat.getUsername(), stat.getJobName(), stat.getQueue(), stat.getJobFile(),
stat.getTrackingUrl(), stat.isUber());
old.setStartTime(stat.getStartTime());
old.setFinishTime(stat.getFinishTime());
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java Fri Aug 3 19:00:15 2012
@@ -144,7 +144,7 @@ public interface Mapper
*
* Applications can use the {@link Reporter} provided to report progress
* or just indicate that they are alive. In scenarios where the application
- * takes an insignificant amount of time to process individual key/value
+ * takes significant amount of time to process individual key/value
* pairs, this is crucial since the framework might assume that the task has
* timed-out and kill that task. The other way of avoiding this is to set
*
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java Fri Aug 3 19:00:15 2012
@@ -29,7 +29,7 @@ import org.apache.hadoop.util.Progressab
*
* {@link Mapper} and {@link Reducer} can use the Reporter
* provided to report progress or just indicate that they are alive. In
- * scenarios where the application takes an insignificant amount of time to
+ * scenarios where the application takes significant amount of time to
* process individual key/value pairs, this is crucial since the framework
* might assume that the task has timed-out and kill that task.
*
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Fri Aug 3 19:00:15 2012
@@ -66,14 +66,6 @@ public class TaskLog {
// localFS is set in (and used by) writeToIndexFile()
static LocalFileSystem localFS = null;
- static {
- if (!LOG_DIR.exists()) {
- boolean b = LOG_DIR.mkdirs();
- if (!b) {
- LOG.debug("mkdirs failed. Ignoring.");
- }
- }
- }
public static String getMRv2LogDir() {
return System.getProperty(MRJobConfig.TASK_LOG_DIR);
@@ -638,6 +630,12 @@ public class TaskLog {
* @return base log directory
*/
static File getUserLogDir() {
+ if (!LOG_DIR.exists()) {
+ boolean b = LOG_DIR.mkdirs();
+ if (!b) {
+ LOG.debug("mkdirs failed. Ignoring.");
+ }
+ }
return LOG_DIR;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java Fri Aug 3 19:00:15 2012
@@ -313,7 +313,6 @@ public class Submitter extends Configure
// add default debug script only when executable is expressed as
// #
if (exec.contains("#")) {
- DistributedCache.createSymlink(conf);
// set default gdb commands for map and reduce task
String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Fri Aug 3 19:00:15 2012
@@ -1049,9 +1049,10 @@ public class Job extends JobContextImpl
}
/**
- * This method allows you to create symlinks in the current working directory
- * of the task to all the cache files/archives
+ * Originally intended to enable symlinks, but currently symlinks cannot be
+ * disabled.
*/
+ @Deprecated
public void createSymlink() {
ensureState(JobState.DEFINE);
DistributedCache.createSymlink(conf);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java Fri Aug 3 19:00:15 2012
@@ -221,10 +221,11 @@ public interface JobContext extends MRJo
public String getUser();
/**
- * This method checks to see if symlinks are to be create for the
- * localized cache files in the current working directory
- * @return true if symlinks are to be created- else return false
+ * Originally intended to check if symlinks should be used, but currently
+ * symlinks cannot be disabled.
+ * @return true
*/
+ @Deprecated
public boolean getSymlink();
/**
@@ -251,14 +252,22 @@ public interface JobContext extends MRJo
* Return the path array of the localized caches
* @return A path array of localized caches
* @throws IOException
+ * @deprecated the array returned only includes the items the were
+ * downloaded. There is no way to map this to what is returned by
+ * {@link #getCacheArchives()}.
*/
+ @Deprecated
public Path[] getLocalCacheArchives() throws IOException;
/**
* Return the path array of the localized files
* @return A path array of localized files
* @throws IOException
+ * @deprecated the array returned only includes the items the were
+ * downloaded. There is no way to map this to what is returned by
+ * {@link #getCacheFiles()}.
*/
+ @Deprecated
public Path[] getLocalCacheFiles() throws IOException;
/**
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Aug 3 19:00:15 2012
@@ -135,8 +135,9 @@ class JobSubmitter {
short replication) throws IOException {
Configuration conf = job.getConfiguration();
if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
- LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
- "Applications should implement Tool for the same.");
+ LOG.warn("Hadoop command-line option parsing not performed. " +
+ "Implement the Tool interface and execute your application " +
+ "with ToolRunner to remedy this.");
}
// get all the command line arguments passed in by the user conf
@@ -189,7 +190,6 @@ class JobSubmitter {
//should not throw a uri exception
throw new IOException("Failed to create uri for " + tmpFile, ue);
}
- DistributedCache.createSymlink(conf);
}
}
@@ -224,7 +224,6 @@ class JobSubmitter {
//should not throw an uri excpetion
throw new IOException("Failed to create uri for " + tmpArchives, ue);
}
- DistributedCache.createSymlink(conf);
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Aug 3 19:00:15 2012
@@ -79,4 +79,9 @@ public interface MRConfig {
public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
public static final String MAX_BLOCK_LOCATIONS_KEY =
"mapreduce.job.max.split.locations";
+
+ public static final String SHUFFLE_SSL_ENABLED_KEY =
+ "mapreduce.shuffle.ssl.enabled";
+
+ public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Aug 3 19:00:15 2012
@@ -114,6 +114,10 @@ public interface MRJobConfig {
public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
+ /**
+ * @deprecated Symlinks are always on and cannot be disabled.
+ */
+ @Deprecated
public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";
public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Fri Aug 3 19:00:15 2012
@@ -55,8 +55,12 @@ import java.net.URI;
* Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
* Jars may be optionally added to the classpath of the tasks, a rudimentary
* software distribution mechanism. Files have execution permissions.
- * Optionally users can also direct it to symlink the distributed cache file(s)
- * into the working directory of the task.
+ * In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ * to be created in the working directory of the child task. In the current
+ * version symlinks are always created. If the URL does not have a fragment
+ * the name of the file or directory will be used. If multiple files or
+ * directories map to the same link name, the last one added, will be used. All
+ * others will not even be downloaded.
*
* DistributedCache
tracks modification timestamps of the cache
* files. Clearly the cache files should not be modified by the application
@@ -98,8 +102,7 @@ import java.net.URI;
*
* public void configure(JobConf job) {
* // Get the cached archives/files
- * localArchives = DistributedCache.getLocalCacheArchives(job);
- * localFiles = DistributedCache.getLocalCacheFiles(job);
+ * File f = new File("./map.zip/some/file/in/zip.txt");
* }
*
* public void map(K key, V value,
@@ -375,32 +378,26 @@ public class DistributedCache {
}
/**
- * This method allows you to create symlinks in the current working directory
- * of the task to all the cache files/archives.
- * Intended to be used by user code.
+ * Originally intended to enable symlinks, but currently symlinks cannot be
+ * disabled. This is a NO-OP.
* @param conf the jobconf
- * @deprecated Use {@link Job#createSymlink()} instead
+ * @deprecated This is a NO-OP.
*/
@Deprecated
public static void createSymlink(Configuration conf){
- conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
+ //NOOP
}
/**
- * This method checks to see if symlinks are to be create for the
- * localized cache files in the current working directory
- * Used by internal DistributedCache code.
+ * Originally intended to check if symlinks should be used, but currently
+ * symlinks cannot be disabled.
* @param conf the jobconf
- * @return true if symlinks are to be created- else return false
- * @deprecated Use {@link JobContext#getSymlink()} instead
+ * @return true
+ * @deprecated symlinks are always created.
*/
@Deprecated
public static boolean getSymlink(Configuration conf){
- String result = conf.get(MRJobConfig.CACHE_SYMLINK);
- if ("yes".equals(result)){
- return true;
- }
- return false;
+ return true;
}
private static boolean[] parseBooleans(String[] strs) {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Aug 3 19:00:15 2012
@@ -21,15 +21,18 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
-import java.net.HttpURLConnection;
import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
+import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,14 +45,17 @@ import org.apache.hadoop.mapred.Counters
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
-@SuppressWarnings({"deprecation"})
+import com.google.common.annotations.VisibleForTesting;
+
class Fetcher extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -92,6 +98,9 @@ class Fetcher extends Thread {
private volatile boolean stopped = false;
+ private static boolean sslShuffle;
+ private static SSLFactory sslFactory;
+
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler scheduler, MergeManager merger,
Reporter reporter, ShuffleClientMetrics metrics,
@@ -135,6 +144,20 @@ class Fetcher extends Thread {
setName("fetcher#" + id);
setDaemon(true);
+
+ synchronized (Fetcher.class) {
+ sslShuffle = job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+ MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
+ if (sslShuffle && sslFactory == null) {
+ sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
+ try {
+ sslFactory.init();
+ } catch (Exception ex) {
+ sslFactory.destroy();
+ throw new RuntimeException(ex);
+ }
+ }
+ }
}
public void run() {
@@ -173,15 +196,34 @@ class Fetcher extends Thread {
} catch (InterruptedException ie) {
LOG.warn("Got interrupt while joining " + getName(), ie);
}
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
}
+ @VisibleForTesting
+ protected HttpURLConnection openConnection(URL url) throws IOException {
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ if (sslShuffle) {
+ HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
+ try {
+ httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
+ } catch (GeneralSecurityException ex) {
+ throw new IOException(ex);
+ }
+ httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
+ }
+ return conn;
+ }
+
/**
* The crux of the matter...
*
* @param host {@link MapHost} from which we need to
* shuffle available map-outputs.
*/
- private void copyFromHost(MapHost host) throws IOException {
+ @VisibleForTesting
+ protected void copyFromHost(MapHost host) throws IOException {
// Get completed maps on 'host'
List maps = scheduler.getMapsForHost(host);
@@ -191,9 +233,9 @@ class Fetcher extends Thread {
return;
}
- LOG.debug("Fetcher " + id + " going to fetch from " + host);
- for (TaskAttemptID tmp: maps) {
- LOG.debug(tmp);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ + maps);
}
// List of maps to be fetched yet
@@ -205,7 +247,7 @@ class Fetcher extends Thread {
try {
URL url = getMapOutputURL(host, maps);
- HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+ HttpURLConnection connection = openConnection(url);
// generate hash of the url
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
@@ -266,17 +308,25 @@ class Fetcher extends Thread {
try {
// Loop through available map-outputs and fetch them
- // On any error, good becomes false and we exit after putting back
- // the remaining maps to the yet_to_be_fetched list
- boolean good = true;
- while (!remaining.isEmpty() && good) {
- good = copyMapOutput(host, input, remaining);
+ // On any error, faildTasks is not null and we exit
+ // after putting back the remaining maps to the
+ // yet_to_be_fetched list and marking the failed tasks.
+ TaskAttemptID[] failedTasks = null;
+ while (!remaining.isEmpty() && failedTasks == null) {
+ failedTasks = copyMapOutput(host, input, remaining);
+ }
+
+ if(failedTasks != null && failedTasks.length > 0) {
+ LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+ for(TaskAttemptID left: failedTasks) {
+ scheduler.copyFailed(left, host, true);
+ }
}
IOUtils.cleanup(LOG, input);
// Sanity check
- if (good && !remaining.isEmpty()) {
+ if (failedTasks == null && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
+ remaining.size() + " left.");
}
@@ -285,10 +335,11 @@ class Fetcher extends Thread {
scheduler.putBackKnownMapOutput(host, left);
}
}
-
- }
+ }
+
+ private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0];
- private boolean copyMapOutput(MapHost host,
+ private TaskAttemptID[] copyMapOutput(MapHost host,
DataInputStream input,
Set remaining) {
MapOutput mapOutput = null;
@@ -310,18 +361,21 @@ class Fetcher extends Thread {
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
- return false;
+ //Don't know which one was bad, so consider all of them as bad
+ return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
- return false;
+ return new TaskAttemptID[] {mapId};
}
- LOG.debug("header: " + mapId + ", len: " + compressedLength +
- ", decomp len: " + decompressedLength);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("header: " + mapId + ", len: " + compressedLength +
+ ", decomp len: " + decompressedLength);
+ }
// Get the location for the map output - either in-memory or on-disk
mapOutput = merger.reserve(mapId, decompressedLength, id);
@@ -329,7 +383,8 @@ class Fetcher extends Thread {
// Check if we can shuffle *now* ...
if (mapOutput.getType() == Type.WAIT) {
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
- return false;
+ //Not an error but wait to process data.
+ return EMPTY_ATTEMPT_ID_ARRAY;
}
// Go!
@@ -351,24 +406,27 @@ class Fetcher extends Thread {
// Note successful shuffle
remaining.remove(mapId);
metrics.successFetch();
- return true;
+ return null;
} catch (IOException ioe) {
ioErrs.increment(1);
if (mapId == null || mapOutput == null) {
LOG.info("fetcher#" + id + " failed to read map header" +
mapId + " decomp: " +
decompressedLength + ", " + compressedLength, ioe);
- return false;
+ if(mapId == null) {
+ return remaining.toArray(new TaskAttemptID[remaining.size()]);
+ } else {
+ return new TaskAttemptID[] {mapId};
+ }
}
- LOG.info("Failed to shuffle output of " + mapId +
+ LOG.warn("Failed to shuffle output of " + mapId +
" from " + host.getHostName(), ioe);
// Inform the shuffle-scheduler
mapOutput.abort();
- scheduler.copyFailed(mapId, host, true);
metrics.failedFetch();
- return false;
+ return new TaskAttemptID[] {mapId};
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Fri Aug 3 19:00:15 2012
@@ -246,8 +246,6 @@ public class ConfigUtil {
new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS});
Configuration.addDeprecation("mapred.cache.archives.timestamps",
new String[] {MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS});
- Configuration.addDeprecation("mapred.create.symlink",
- new String[] {MRJobConfig.CACHE_SYMLINK});
Configuration.addDeprecation("mapred.working.dir",
new String[] {MRJobConfig.WORKING_DIR});
Configuration.addDeprecation("user.name",
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Aug 3 19:00:15 2012
@@ -513,6 +513,21 @@
+ mapreduce.shuffle.ssl.enabled
+ false
+
+ Whether to use SSL for for the Shuffle HTTP endpoints.
+
+
+
+
+ mapreduce.shuffle.ssl.file.buffer.size
+ 65536
+ Buffer size for reading spills from file when using SSL.
+
+
+
+
mapreduce.reduce.markreset.buffer.percent
0.0
The percentage of memory -relative to the maximum heap size- to
@@ -692,7 +707,7 @@
- mapreduce.output.fileoutputformat.compression.type
+ mapreduce.output.fileoutputformat.compress.type
RECORD
If the job outputs are to compressed as SequenceFiles, how should
they be compressed? Should be one of NONE, RECORD or BLOCK.
@@ -700,7 +715,7 @@
- mapreduce.output.fileoutputformat.compression.codec
+ mapreduce.output.fileoutputformat.compress.codec
org.apache.hadoop.io.compress.DefaultCodec
If the job outputs are compressed, how should they be compressed?
@@ -1285,7 +1300,7 @@
mapreduce.jobhistory.address
0.0.0.0:10020
- MapReduce JobHistory Server host:port
+ MapReduce JobHistory Server IPC host:port
@@ -1294,4 +1309,21 @@
MapReduce JobHistory Server Web UI host:port
+
+ mapreduce.jobhistory.keytab
+
+ Location of the kerberos keytab file for the MapReduce
+ JobHistory Server.
+
+ /etc/security/keytab/jhs.service.keytab
+
+
+
+ mapreduce.jobhistory.principal
+
+ Kerberos principal name for the MapReduce JobHistory Server.
+
+ jhs/_HOST@REALM.TLD
+
+
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1358480-1369130
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java Fri Aug 3 19:00:15 2012
@@ -35,16 +35,23 @@ import org.apache.hadoop.mapreduce.serve
import junit.framework.TestCase;
public class TestIndexCache extends TestCase {
+ private JobConf conf;
+ private FileSystem fs;
+ private Path p;
+
+ @Override
+ public void setUp() throws IOException {
+ conf = new JobConf();
+ fs = FileSystem.getLocal(conf).getRaw();
+ p = new Path(System.getProperty("test.build.data", "/tmp"),
+ "cache").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ }
public void testLRCPolicy() throws Exception {
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("seed: " + seed);
- JobConf conf = new JobConf();
- FileSystem fs = FileSystem.getLocal(conf).getRaw();
- Path p = new Path(System.getProperty("test.build.data", "/tmp"),
- "cache").makeQualified(fs);
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
final int partsPerMap = 1000;
@@ -115,10 +122,6 @@ public class TestIndexCache extends Test
public void testBadIndex() throws Exception {
final int parts = 30;
- JobConf conf = new JobConf();
- FileSystem fs = FileSystem.getLocal(conf).getRaw();
- Path p = new Path(System.getProperty("test.build.data", "/tmp"),
- "cache").makeQualified(fs);
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
IndexCache cache = new IndexCache(conf);
@@ -150,10 +153,6 @@ public class TestIndexCache extends Test
}
public void testInvalidReduceNumberOrLength() throws Exception {
- JobConf conf = new JobConf();
- FileSystem fs = FileSystem.getLocal(conf).getRaw();
- Path p = new Path(System.getProperty("test.build.data", "/tmp"),
- "cache").makeQualified(fs);
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
final int partsPerMap = 1000;
@@ -199,10 +198,6 @@ public class TestIndexCache extends Test
// This test case may not repeatable. But on my macbook this test
// fails with probability of 100% on code before MAPREDUCE-2541,
// so it is repeatable in practice.
- JobConf conf = new JobConf();
- FileSystem fs = FileSystem.getLocal(conf).getRaw();
- Path p = new Path(System.getProperty("test.build.data", "/tmp"),
- "cache").makeQualified(fs);
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 10);
// Make a big file so removeMapThread almost surely runs faster than
@@ -247,6 +242,66 @@ public class TestIndexCache extends Test
}
}
+ public void testCreateRace() throws Exception {
+ fs.delete(p, true);
+ conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
+ final int partsPerMap = 1000;
+ final int bytesPerFile = partsPerMap * 24;
+ final IndexCache cache = new IndexCache(conf);
+
+ final Path racy = new Path(p, "racyIndex");
+ final String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ writeFile(fs, racy, bytesPerFile, partsPerMap);
+
+ // run multiple instances
+ Thread[] getInfoThreads = new Thread[50];
+ for (int i = 0; i < 50; i++) {
+ getInfoThreads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ cache.getIndexInformation("racyIndex", partsPerMap, racy, user);
+ cache.removeMap("racyIndex");
+ } catch (Exception e) {
+ // should not be here
+ }
+ }
+ };
+ }
+
+ for (int i = 0; i < 50; i++) {
+ getInfoThreads[i].start();
+ }
+
+ final Thread mainTestThread = Thread.currentThread();
+
+ Thread timeoutThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(15000);
+ mainTestThread.interrupt();
+ } catch (InterruptedException ie) {
+ // we are done;
+ }
+ }
+ };
+
+ for (int i = 0; i < 50; i++) {
+ try {
+ getInfoThreads[i].join();
+ } catch (InterruptedException ie) {
+ // we haven't finished in time. Potential deadlock/race.
+ fail("Unexpectedly long delay during concurrent cache entry creations");
+ }
+ }
+ // stop the timeoutThread. If we get interrupted before stopping, there
+ // must be something wrong, although it wasn't a deadlock. No need to
+ // catch and swallow.
+ timeoutThread.interrupt();
+ }
+
private static void checkRecord(IndexRecord rec, long fill) {
assertEquals(fill, rec.startOffset);
assertEquals(fill, rec.rawLength);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Fri Aug 3 19:00:15 2012
@@ -336,7 +336,7 @@ public class HistoryFileManager extends
public synchronized Configuration loadConfFile() throws IOException {
FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
Configuration jobConf = new Configuration(false);
- jobConf.addResource(fc.open(confFile));
+ jobConf.addResource(fc.open(confFile), confFile.toString());
return jobConf;
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Fri Aug 3 19:00:15 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.service.CompositeService;
@@ -122,6 +123,7 @@ public class JobHistoryServer extends Co
}
public static void main(String[] args) {
+ Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
try {
JobHistoryServer jobHistoryServer = new JobHistoryServer();
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java Fri Aug 3 19:00:15 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.h
import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
@@ -60,10 +61,10 @@ public class HsWebApp extends WebApp imp
route(pajoin("/singletaskcounter",TASK_ID, COUNTER_GROUP, COUNTER_NAME),
HsController.class, "singleTaskCounter");
route("/about", HsController.class, "about");
- route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
- HsController.class, "logs");
- route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
- HsController.class, "nmlogs");
+ route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
+ CONTAINER_LOG_TYPE), HsController.class, "logs");
+ route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
+ CONTAINER_LOG_TYPE), HsController.class, "nmlogs");
}
}