Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CDD35D85A for ; Fri, 3 Aug 2012 19:02:02 +0000 (UTC) Received: (qmail 18752 invoked by uid 500); 3 Aug 2012 19:02:02 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 18640 invoked by uid 500); 3 Aug 2012 19:02:02 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 18315 invoked by uid 99); 3 Aug 2012 19:02:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Aug 2012 19:02:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Aug 2012 19:01:51 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DA77D2388AA9; Fri, 3 Aug 2012 19:00:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1369164 [2/7] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduc... Date: Fri, 03 Aug 2012 19:00:51 -0000 To: mapreduce-commits@hadoop.apache.org From: gkesavan@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120803190059.DA77D2388AA9@eris.apache.org> Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Aug 3 19:00:15 2012 @@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -47,9 +46,6 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; -import org.apache.hadoop.mapreduce.v2.app.job.Job; -import org.apache.hadoop.mapreduce.v2.app.job.Task; -import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; @@ -131,6 +127,7 @@ public class RMContainerAllocator extend private int containersReleased = 0; private int hostLocalAssigned = 0; private int rackLocalAssigned = 0; + private int lastCompletedTasks = 0; private boolean recalculateReduceSchedule = false; private int mapResourceReqt;//memory @@ -214,11 +211,18 @@ public class RMContainerAllocator extend scheduledRequests.assign(allocatedContainers); LOG.info("After Assign: " + getStat()); } - + + int completedMaps = getJob().getCompletedMaps(); + int completedTasks = completedMaps + getJob().getCompletedReduces(); + if (lastCompletedTasks != completedTasks) { + lastCompletedTasks = completedTasks; + recalculateReduceSchedule = true; + } + if (recalculateReduceSchedule) { preemptReducesIfNeeded(); scheduleReduces( - getJob().getTotalMaps(), getJob().getCompletedMaps(), + getJob().getTotalMaps(), completedMaps, scheduledRequests.maps.size(), scheduledRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(), mapResourceReqt, reduceResourceReqt, Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java Fri Aug 3 19:00:15 2012 @@ -78,14 +78,29 @@ public class ConfBlock extends HtmlBlock tr(). th(_TH, "key"). th(_TH, "value"). + th(_TH, "source chain"). _(). _(). tbody(); for (ConfEntryInfo entry : info.getProperties()) { + StringBuffer buffer = new StringBuffer(); + String[] sources = entry.getSource(); + //Skip the last entry, because it is always the same HDFS file, and + // output them in reverse order so most recent is output first + boolean first = true; + for(int i = (sources.length - 2); i >= 0; i--) { + if(!first) { + // \u2B05 is an arrow <-- + buffer.append(" \u2B05 "); + } + first = false; + buffer.append(sources[i]); + } tbody. tr(). td(entry.getName()). td(entry.getValue()). + td(buffer.toString()). _(); } tbody._(). @@ -93,6 +108,7 @@ public class ConfBlock extends HtmlBlock tr(). th().input("search_init").$type(InputType.text).$name("key").$value("key")._()._(). th().input("search_init").$type(InputType.text).$name("value").$value("value")._()._(). + th().input("search_init").$type(InputType.text).$name("source chain").$value("source chain")._()._(). _(). _(). _(); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java Fri Aug 3 19:00:15 2012 @@ -64,7 +64,7 @@ public class AMAttemptInfo { if (containerId != null) { this.containerId = containerId.toString(); this.logsLink = join("http://" + nodeHttpAddress, - ujoin("node", "containerlogs", this.containerId)); + ujoin("node", "containerlogs", this.containerId, user)); } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java Fri Aug 3 19:00:15 2012 @@ -27,13 +27,19 @@ public class ConfEntryInfo { protected String name; protected String value; + protected String[] source; public ConfEntryInfo() { } public ConfEntryInfo(String key, String value) { + this(key, value, null); + } + + public ConfEntryInfo(String key, String value, String[] source) { this.name = key; this.value = value; + this.source = source; } public String getName() { @@ -43,4 +49,8 @@ public class ConfEntryInfo { public String getValue() { return this.value; } + + public String[] getSource() { + return source; + } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java Fri Aug 3 19:00:15 2012 @@ -46,7 +46,8 @@ public class ConfInfo { Configuration jobConf = job.loadConfFile(); this.path = job.getConfFile().toString(); for (Map.Entry entry : jobConf) { - this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue())); + this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue(), + jobConf.getPropertySources(entry.getKey()))); } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Aug 3 19:00:15 2012 @@ -603,7 +603,7 @@ public class MockJobs extends MockApps { public Configuration loadConfFile() throws IOException { FileContext fc = FileContext.getFileContext(configFile.toUri(), conf); Configuration jobConf = new Configuration(false); - jobConf.addResource(fc.open(configFile)); + jobConf.addResource(fc.open(configFile), configFile.toString()); return jobConf; } }; Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Aug 3 19:00:15 2012 @@ -29,6 +29,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -86,11 +87,13 @@ import org.apache.hadoop.yarn.factories. import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Test; @@ -352,7 +355,7 @@ public class TestRMContainerAllocator { } @Override protected ResourceScheduler createScheduler() { - return new MyFifoScheduler(); + return new MyFifoScheduler(this.getRMContext()); } } @@ -1091,6 +1094,19 @@ public class TestRMContainerAllocator { } private static class MyFifoScheduler extends FifoScheduler { + + public MyFifoScheduler(RMContext rmContext) { + super(); + try { + Configuration conf = new Configuration(); + reinitialize(conf, new ContainerTokenSecretManager(conf), + rmContext); + } catch (IOException ie) { + LOG.info("add application failed with ", ie); + assert (false); + } + } + // override this to copy the objects otherwise FifoScheduler updates the // numContainers in same objects as kept by RMContainerAllocator @Override @@ -1393,7 +1409,63 @@ public class TestRMContainerAllocator { maxReduceRampupLimit, reduceSlowStart); verify(allocator).rampDownReduces(anyInt()); } + + private static class RecalculateContainerAllocator extends MyContainerAllocator { + public boolean recalculatedReduceSchedule = false; + + public RecalculateContainerAllocator(MyResourceManager rm, + Configuration conf, ApplicationAttemptId appAttemptId, Job job) { + super(rm, conf, appAttemptId, job); + } + + @Override + public void scheduleReduces(int totalMaps, int completedMaps, + int scheduledMaps, int scheduledReduces, int assignedMaps, + int assignedReduces, int mapResourceReqt, int reduceResourceReqt, + int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) { + recalculatedReduceSchedule = true; + } + } + @Test + public void testCompletedTasksRecalculateSchedule() throws Exception { + LOG.info("Running testCompletedTasksRecalculateSchedule"); + + Configuration conf = new Configuration(); + final MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job job = mock(Job.class); + when(job.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false)); + doReturn(10).when(job).getTotalMaps(); + doReturn(10).when(job).getTotalReduces(); + doReturn(0).when(job).getCompletedMaps(); + RecalculateContainerAllocator allocator = + new RecalculateContainerAllocator(rm, conf, appAttemptId, job); + allocator.schedule(); + + allocator.recalculatedReduceSchedule = false; + allocator.schedule(); + Assert.assertFalse("Unexpected recalculate of reduce schedule", + allocator.recalculatedReduceSchedule); + + doReturn(1).when(job).getCompletedMaps(); + allocator.schedule(); + Assert.assertTrue("Expected recalculate of reduce schedule", + allocator.recalculatedReduceSchedule); + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); @@ -1402,6 +1474,7 @@ public class TestRMContainerAllocator { t.testReportedAppProgress(); t.testReportedAppProgressWithOnlyMaps(); t.testBlackListedNodes(); + t.testCompletedTasksRecalculateSchedule(); } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Fri Aug 3 19:00:15 2012 @@ -565,6 +565,73 @@ public class TestTaskAttempt{ assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); } + + @Test + public void testDoubleTooManyFetchFailure() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(clusterInfo.getMinContainerCapability()).thenReturn(resource); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener, + mock(OutputCommitter.class), mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_DONE)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + + assertEquals("Task attempt is not in succeeded state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); + assertEquals("Task attempt is not in FAILED state", taImpl.getState(), + TaskAttemptState.FAILED); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); + assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(), + TaskAttemptState.FAILED); + assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", + eventHandler.internalError); + } public static class MockEventHandler implements EventHandler { public boolean internalError; Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Aug 3 19:00:15 2012 @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -127,8 +128,13 @@ public class TestTaskImpl { @Override protected int getMaxAttempts() { return 100; - } - + } + + @Override + protected void internalError(TaskEventType type) { + super.internalError(type); + fail("Internal error: " + type); + } } private class MockTaskAttemptImpl extends TaskAttemptImpl { @@ -462,5 +468,32 @@ public class TestTaskImpl { assertTaskSucceededState(); } + + @Test + public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + updateLastAttemptState(TaskAttemptState.RUNNING); + + // Add a speculative task attempt that succeeds + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + commitTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + // The task should now have succeeded + assertTaskSucceededState(); + + // Now fail the first task attempt, after the second has succeeded + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + // The task should still be in the succeeded state + assertTaskSucceededState(); + + } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java Fri Aug 3 19:00:15 2012 @@ -972,7 +972,8 @@ public class TestAMWebServicesJobs exten WebServicesTestUtils.checkStringMatch("containerId", amInfo .getContainerId().toString(), containerId); - String localLogsLink = ujoin("node", "containerlogs", containerId); + String localLogsLink =ujoin("node", "containerlogs", containerId, + job.getUserName()); assertTrue("logsLink", logsLink.contains(localLogsLink)); } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Fri Aug 3 19:00:15 2012 @@ -18,12 +18,9 @@ package org.apache.hadoop.mapred; -import com.google.common.collect.Maps; - import java.io.File; import java.io.IOException; import java.net.MalformedURLException; -import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; @@ -34,6 +31,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -85,6 +84,8 @@ class LocalDistributedCacheManager { * @throws IOException */ public void setup(JobConf conf) throws IOException { + File workDir = new File(System.getProperty("user.dir")); + // Generate YARN local resources objects corresponding to the distributed // cache configuration Map localResources = @@ -132,7 +133,8 @@ class LocalDistributedCacheManager { Future future = exec.submit(download); resourcesToPaths.put(resource, future); } - for (LocalResource resource : localResources.values()) { + for (Entry entry : localResources.entrySet()) { + LocalResource resource = entry.getValue(); Path path; try { path = resourcesToPaths.get(resource).get(); @@ -142,6 +144,10 @@ class LocalDistributedCacheManager { throw new IOException(e); } String pathString = path.toUri().toString(); + String link = entry.getKey(); + String target = new File(path.toUri()).getPath(); + symlink(workDir, target, link); + if (resource.getType() == LocalResourceType.ARCHIVE) { localArchives.add(pathString); } else if (resource.getType() == LocalResourceType.FILE) { @@ -175,27 +181,6 @@ class LocalDistributedCacheManager { .arrayToString(localFiles.toArray(new String[localArchives .size()]))); } - if (DistributedCache.getSymlink(conf)) { - File workDir = new File(System.getProperty("user.dir")); - URI[] archives = DistributedCache.getCacheArchives(conf); - URI[] files = DistributedCache.getCacheFiles(conf); - Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); - Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); - if (archives != null) { - for (int i = 0; i < archives.length; i++) { - String link = archives[i].getFragment(); - String target = new File(localArchives[i].toUri()).getPath(); - symlink(workDir, target, link); - } - } - if (files != null) { - for (int i = 0; i < files.length; i++) { - String link = files[i].getFragment(); - String target = new File(localFiles[i].toUri()).getPath(); - symlink(workDir, target, link); - } - } - } setupCalled = true; } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Fri Aug 3 19:00:15 2012 @@ -383,6 +383,7 @@ public class TypeConverter { switch (yarnApplicationState) { case NEW: case SUBMITTED: + case ACCEPTED: return State.PREP; case RUNNING: return State.RUNNING; Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Fri Aug 3 19:00:15 2012 @@ -133,5 +133,16 @@ public class JHAdminConfig { * The HistoryStorage class to use to cache history data. */ public static final String MR_HISTORY_STORAGE = - MR_HISTORY_PREFIX + ".store.class"; + MR_HISTORY_PREFIX + "store.class"; + + /** Whether to use fixed ports with the minicluster. */ + public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX + + "minicluster.fixed.ports"; + + /** + * Default is false to be able to run tests concurrently without port + * conflicts. + */ + public static boolean DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS = false; + } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Fri Aug 3 19:00:15 2012 @@ -171,8 +171,9 @@ public class MRApps extends Apps { } // Add standard Hadoop classes - for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH) - .split(",")) { + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c .trim()); } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Fri Aug 3 19:00:15 2012 @@ -117,7 +117,8 @@ public class TestMRWithDistributedCache TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, symlinkFile.length()); - TestCase.assertFalse("second file should not be symlinked", + //This last one is a difference between MRv2 and MRv1 + TestCase.assertTrue("second file should be symlinked too", expectedAbsentSymlinkFile.exists()); } } @@ -145,7 +146,6 @@ public class TestMRWithDistributedCache job.addFileToClassPath(second); job.addArchiveToClassPath(third); job.addCacheArchive(fourth.toUri()); - job.createSymlink(); job.setMaxMapAttempts(1); // speed up failures job.submit(); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java Fri Aug 3 19:00:15 2012 @@ -48,8 +48,12 @@ import org.apache.hadoop.mapreduce.Job; * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. * Jars may be optionally added to the classpath of the tasks, a rudimentary * software distribution mechanism. Files have execution permissions. - * Optionally users can also direct it to symlink the distributed cache file(s) - * into the working directory of the task.

+ * In older version of Hadoop Map/Reduce users could optionally ask for symlinks + * to be created in the working directory of the child task. In the current + * version symlinks are always created. If the URL does not have a fragment + * the name of the file or directory will be used. If multiple files or + * directories map to the same link name, the last one added, will be used. All + * others will not even be downloaded.

* *

DistributedCache tracks modification timestamps of the cache * files. Clearly the cache files should not be modified by the application @@ -91,8 +95,7 @@ import org.apache.hadoop.mapreduce.Job; * * public void configure(JobConf job) { * // Get the cached archives/files - * localArchives = DistributedCache.getLocalCacheArchives(job); - * localFiles = DistributedCache.getLocalCacheFiles(job); + * File f = new File("./map.zip/some/file/in/zip.txt"); * } * * public void map(K key, V value, Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java Fri Aug 3 19:00:15 2012 @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -379,7 +380,8 @@ public class IFile { private int readData(byte[] buf, int off, int len) throws IOException { int bytesRead = 0; while (bytesRead < len) { - int n = in.read(buf, off+bytesRead, len-bytesRead); + int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead, + len - bytesRead); if (n < 0) { return bytesRead; } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java Fri Aug 3 19:00:15 2012 @@ -67,13 +67,13 @@ class IndexCache { if (info == null) { info = readIndexFileToCache(fileName, mapId, expectedIndexOwner); } else { - while (isUnderConstruction(info)) { - try { - // In case the entry is ready after the above check but - // before the following wait, we do timed wait. - info.wait(200); - } catch (InterruptedException e) { - throw new IOException("Interrupted waiting for construction", e); + synchronized(info) { + while (isUnderConstruction(info)) { + try { + info.wait(); + } catch (InterruptedException e) { + throw new IOException("Interrupted waiting for construction", e); + } } } LOG.debug("IndexCache HIT: MapId " + mapId + " found"); @@ -101,13 +101,13 @@ class IndexCache { IndexInformation info; IndexInformation newInd = new IndexInformation(); if ((info = cache.putIfAbsent(mapId, newInd)) != null) { - while (isUnderConstruction(info)) { - try { - // In case the entry is ready after the above check but - // before the following wait, we do timed wait. - info.wait(200); - } catch (InterruptedException e) { - throw new IOException("Interrupted waiting for construction", e); + synchronized(info) { + while (isUnderConstruction(info)) { + try { + info.wait(); + } catch (InterruptedException e) { + throw new IOException("Interrupted waiting for construction", e); + } } } LOG.debug("IndexCache HIT: MapId " + mapId + " found"); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java Fri Aug 3 19:00:15 2012 @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -184,7 +185,7 @@ class JobQueueClient extends Configured printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out)); if (showJobs && (jobQueueInfo.getChildren() == null || jobQueueInfo.getChildren().size() == 0)) { - JobStatus[] jobs = jc.getJobsFromQueue(queue); + JobStatus[] jobs = jobQueueInfo.getJobStatuses(); if (jobs == null) jobs = new JobStatus[0]; jc.displayJobList(jobs); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Fri Aug 3 19:00:15 2012 @@ -238,7 +238,7 @@ public class JobStatus extends org.apach stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(), stat.getCleanupProgress(), stat.getState().getValue(), JobPriority.valueOf(stat.getPriority().name()), - stat.getUsername(), stat.getJobName(), stat.getJobFile(), + stat.getUsername(), stat.getJobName(), stat.getQueue(), stat.getJobFile(), stat.getTrackingUrl(), stat.isUber()); old.setStartTime(stat.getStartTime()); old.setFinishTime(stat.getFinishTime()); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java Fri Aug 3 19:00:15 2012 @@ -144,7 +144,7 @@ public interface Mapper * *

Applications can use the {@link Reporter} provided to report progress * or just indicate that they are alive. In scenarios where the application - * takes an insignificant amount of time to process individual key/value + * takes significant amount of time to process individual key/value * pairs, this is crucial since the framework might assume that the task has * timed-out and kill that task. The other way of avoiding this is to set * Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reporter.java Fri Aug 3 19:00:15 2012 @@ -29,7 +29,7 @@ import org.apache.hadoop.util.Progressab * *

{@link Mapper} and {@link Reducer} can use the Reporter * provided to report progress or just indicate that they are alive. In - * scenarios where the application takes an insignificant amount of time to + * scenarios where the application takes significant amount of time to * process individual key/value pairs, this is crucial since the framework * might assume that the task has timed-out and kill that task. * Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Fri Aug 3 19:00:15 2012 @@ -66,14 +66,6 @@ public class TaskLog { // localFS is set in (and used by) writeToIndexFile() static LocalFileSystem localFS = null; - static { - if (!LOG_DIR.exists()) { - boolean b = LOG_DIR.mkdirs(); - if (!b) { - LOG.debug("mkdirs failed. Ignoring."); - } - } - } public static String getMRv2LogDir() { return System.getProperty(MRJobConfig.TASK_LOG_DIR); @@ -638,6 +630,12 @@ public class TaskLog { * @return base log directory */ static File getUserLogDir() { + if (!LOG_DIR.exists()) { + boolean b = LOG_DIR.mkdirs(); + if (!b) { + LOG.debug("mkdirs failed. Ignoring."); + } + } return LOG_DIR; } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java Fri Aug 3 19:00:15 2012 @@ -313,7 +313,6 @@ public class Submitter extends Configure // add default debug script only when executable is expressed as // # if (exec.contains("#")) { - DistributedCache.createSymlink(conf); // set default gdb commands for map and reduce task String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script"; setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Fri Aug 3 19:00:15 2012 @@ -1049,9 +1049,10 @@ public class Job extends JobContextImpl } /** - * This method allows you to create symlinks in the current working directory - * of the task to all the cache files/archives + * Originally intended to enable symlinks, but currently symlinks cannot be + * disabled. */ + @Deprecated public void createSymlink() { ensureState(JobState.DEFINE); DistributedCache.createSymlink(conf); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java Fri Aug 3 19:00:15 2012 @@ -221,10 +221,11 @@ public interface JobContext extends MRJo public String getUser(); /** - * This method checks to see if symlinks are to be create for the - * localized cache files in the current working directory - * @return true if symlinks are to be created- else return false + * Originally intended to check if symlinks should be used, but currently + * symlinks cannot be disabled. + * @return true */ + @Deprecated public boolean getSymlink(); /** @@ -251,14 +252,22 @@ public interface JobContext extends MRJo * Return the path array of the localized caches * @return A path array of localized caches * @throws IOException + * @deprecated the array returned only includes the items the were + * downloaded. There is no way to map this to what is returned by + * {@link #getCacheArchives()}. */ + @Deprecated public Path[] getLocalCacheArchives() throws IOException; /** * Return the path array of the localized files * @return A path array of localized files * @throws IOException + * @deprecated the array returned only includes the items the were + * downloaded. There is no way to map this to what is returned by + * {@link #getCacheFiles()}. */ + @Deprecated public Path[] getLocalCacheFiles() throws IOException; /** Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Aug 3 19:00:15 2012 @@ -135,8 +135,9 @@ class JobSubmitter { short replication) throws IOException { Configuration conf = job.getConfiguration(); if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) { - LOG.warn("Use GenericOptionsParser for parsing the arguments. " + - "Applications should implement Tool for the same."); + LOG.warn("Hadoop command-line option parsing not performed. " + + "Implement the Tool interface and execute your application " + + "with ToolRunner to remedy this."); } // get all the command line arguments passed in by the user conf @@ -189,7 +190,6 @@ class JobSubmitter { //should not throw a uri exception throw new IOException("Failed to create uri for " + tmpFile, ue); } - DistributedCache.createSymlink(conf); } } @@ -224,7 +224,6 @@ class JobSubmitter { //should not throw an uri excpetion throw new IOException("Failed to create uri for " + tmpArchives, ue); } - DistributedCache.createSymlink(conf); } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Aug 3 19:00:15 2012 @@ -79,4 +79,9 @@ public interface MRConfig { public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10; public static final String MAX_BLOCK_LOCATIONS_KEY = "mapreduce.job.max.split.locations"; + + public static final String SHUFFLE_SSL_ENABLED_KEY = + "mapreduce.shuffle.ssl.enabled"; + + public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Aug 3 19:00:15 2012 @@ -114,6 +114,10 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; + /** + * @deprecated Symlinks are always on and cannot be disabled. + */ + @Deprecated public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create"; public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours"; Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Fri Aug 3 19:00:15 2012 @@ -55,8 +55,12 @@ import java.net.URI; * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. * Jars may be optionally added to the classpath of the tasks, a rudimentary * software distribution mechanism. Files have execution permissions. - * Optionally users can also direct it to symlink the distributed cache file(s) - * into the working directory of the task.

+ * In older version of Hadoop Map/Reduce users could optionally ask for symlinks + * to be created in the working directory of the child task. In the current + * version symlinks are always created. If the URL does not have a fragment + * the name of the file or directory will be used. If multiple files or + * directories map to the same link name, the last one added, will be used. All + * others will not even be downloaded.

* *

DistributedCache tracks modification timestamps of the cache * files. Clearly the cache files should not be modified by the application @@ -98,8 +102,7 @@ import java.net.URI; * * public void configure(JobConf job) { * // Get the cached archives/files - * localArchives = DistributedCache.getLocalCacheArchives(job); - * localFiles = DistributedCache.getLocalCacheFiles(job); + * File f = new File("./map.zip/some/file/in/zip.txt"); * } * * public void map(K key, V value, @@ -375,32 +378,26 @@ public class DistributedCache { } /** - * This method allows you to create symlinks in the current working directory - * of the task to all the cache files/archives. - * Intended to be used by user code. + * Originally intended to enable symlinks, but currently symlinks cannot be + * disabled. This is a NO-OP. * @param conf the jobconf - * @deprecated Use {@link Job#createSymlink()} instead + * @deprecated This is a NO-OP. */ @Deprecated public static void createSymlink(Configuration conf){ - conf.set(MRJobConfig.CACHE_SYMLINK, "yes"); + //NOOP } /** - * This method checks to see if symlinks are to be create for the - * localized cache files in the current working directory - * Used by internal DistributedCache code. + * Originally intended to check if symlinks should be used, but currently + * symlinks cannot be disabled. * @param conf the jobconf - * @return true if symlinks are to be created- else return false - * @deprecated Use {@link JobContext#getSymlink()} instead + * @return true + * @deprecated symlinks are always created. */ @Deprecated public static boolean getSymlink(Configuration conf){ - String result = conf.get(MRJobConfig.CACHE_SYMLINK); - if ("yes".equals(result)){ - return true; - } - return false; + return true; } private static boolean[] parseBooleans(String[] strs) { Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Aug 3 19:00:15 2012 @@ -21,15 +21,18 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; -import java.net.HttpURLConnection; import java.net.URLConnection; +import java.security.GeneralSecurityException; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; import javax.crypto.SecretKey; +import javax.net.ssl.HttpsURLConnection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,14 +45,17 @@ import org.apache.hadoop.mapred.Counters import org.apache.hadoop.mapred.IFileInputStream; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -@SuppressWarnings({"deprecation"}) +import com.google.common.annotations.VisibleForTesting; + class Fetcher extends Thread { private static final Log LOG = LogFactory.getLog(Fetcher.class); @@ -92,6 +98,9 @@ class Fetcher extends Thread { private volatile boolean stopped = false; + private static boolean sslShuffle; + private static SSLFactory sslFactory; + public Fetcher(JobConf job, TaskAttemptID reduceId, ShuffleScheduler scheduler, MergeManager merger, Reporter reporter, ShuffleClientMetrics metrics, @@ -135,6 +144,20 @@ class Fetcher extends Thread { setName("fetcher#" + id); setDaemon(true); + + synchronized (Fetcher.class) { + sslShuffle = job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, + MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT); + if (sslShuffle && sslFactory == null) { + sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job); + try { + sslFactory.init(); + } catch (Exception ex) { + sslFactory.destroy(); + throw new RuntimeException(ex); + } + } + } } public void run() { @@ -173,15 +196,34 @@ class Fetcher extends Thread { } catch (InterruptedException ie) { LOG.warn("Got interrupt while joining " + getName(), ie); } + if (sslFactory != null) { + sslFactory.destroy(); + } } + @VisibleForTesting + protected HttpURLConnection openConnection(URL url) throws IOException { + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + if (sslShuffle) { + HttpsURLConnection httpsConn = (HttpsURLConnection) conn; + try { + httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory()); + } catch (GeneralSecurityException ex) { + throw new IOException(ex); + } + httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier()); + } + return conn; + } + /** * The crux of the matter... * * @param host {@link MapHost} from which we need to * shuffle available map-outputs. */ - private void copyFromHost(MapHost host) throws IOException { + @VisibleForTesting + protected void copyFromHost(MapHost host) throws IOException { // Get completed maps on 'host' List maps = scheduler.getMapsForHost(host); @@ -191,9 +233,9 @@ class Fetcher extends Thread { return; } - LOG.debug("Fetcher " + id + " going to fetch from " + host); - for (TaskAttemptID tmp: maps) { - LOG.debug(tmp); + if(LOG.isDebugEnabled()) { + LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " + + maps); } // List of maps to be fetched yet @@ -205,7 +247,7 @@ class Fetcher extends Thread { try { URL url = getMapOutputURL(host, maps); - HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + HttpURLConnection connection = openConnection(url); // generate hash of the url String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); @@ -266,17 +308,25 @@ class Fetcher extends Thread { try { // Loop through available map-outputs and fetch them - // On any error, good becomes false and we exit after putting back - // the remaining maps to the yet_to_be_fetched list - boolean good = true; - while (!remaining.isEmpty() && good) { - good = copyMapOutput(host, input, remaining); + // On any error, faildTasks is not null and we exit + // after putting back the remaining maps to the + // yet_to_be_fetched list and marking the failed tasks. + TaskAttemptID[] failedTasks = null; + while (!remaining.isEmpty() && failedTasks == null) { + failedTasks = copyMapOutput(host, input, remaining); + } + + if(failedTasks != null && failedTasks.length > 0) { + LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks)); + for(TaskAttemptID left: failedTasks) { + scheduler.copyFailed(left, host, true); + } } IOUtils.cleanup(LOG, input); // Sanity check - if (good && !remaining.isEmpty()) { + if (failedTasks == null && !remaining.isEmpty()) { throw new IOException("server didn't return all expected map outputs: " + remaining.size() + " left."); } @@ -285,10 +335,11 @@ class Fetcher extends Thread { scheduler.putBackKnownMapOutput(host, left); } } - - } + } + + private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0]; - private boolean copyMapOutput(MapHost host, + private TaskAttemptID[] copyMapOutput(MapHost host, DataInputStream input, Set remaining) { MapOutput mapOutput = null; @@ -310,18 +361,21 @@ class Fetcher extends Thread { } catch (IllegalArgumentException e) { badIdErrs.increment(1); LOG.warn("Invalid map id ", e); - return false; + //Don't know which one was bad, so consider all of them as bad + return remaining.toArray(new TaskAttemptID[remaining.size()]); } // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { - return false; + return new TaskAttemptID[] {mapId}; } - LOG.debug("header: " + mapId + ", len: " + compressedLength + - ", decomp len: " + decompressedLength); + if(LOG.isDebugEnabled()) { + LOG.debug("header: " + mapId + ", len: " + compressedLength + + ", decomp len: " + decompressedLength); + } // Get the location for the map output - either in-memory or on-disk mapOutput = merger.reserve(mapId, decompressedLength, id); @@ -329,7 +383,8 @@ class Fetcher extends Thread { // Check if we can shuffle *now* ... if (mapOutput.getType() == Type.WAIT) { LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - return false; + //Not an error but wait to process data. + return EMPTY_ATTEMPT_ID_ARRAY; } // Go! @@ -351,24 +406,27 @@ class Fetcher extends Thread { // Note successful shuffle remaining.remove(mapId); metrics.successFetch(); - return true; + return null; } catch (IOException ioe) { ioErrs.increment(1); if (mapId == null || mapOutput == null) { LOG.info("fetcher#" + id + " failed to read map header" + mapId + " decomp: " + decompressedLength + ", " + compressedLength, ioe); - return false; + if(mapId == null) { + return remaining.toArray(new TaskAttemptID[remaining.size()]); + } else { + return new TaskAttemptID[] {mapId}; + } } - LOG.info("Failed to shuffle output of " + mapId + + LOG.warn("Failed to shuffle output of " + mapId + " from " + host.getHostName(), ioe); // Inform the shuffle-scheduler mapOutput.abort(); - scheduler.copyFailed(mapId, host, true); metrics.failedFetch(); - return false; + return new TaskAttemptID[] {mapId}; } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Fri Aug 3 19:00:15 2012 @@ -246,8 +246,6 @@ public class ConfigUtil { new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS}); Configuration.addDeprecation("mapred.cache.archives.timestamps", new String[] {MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS}); - Configuration.addDeprecation("mapred.create.symlink", - new String[] {MRJobConfig.CACHE_SYMLINK}); Configuration.addDeprecation("mapred.working.dir", new String[] {MRJobConfig.WORKING_DIR}); Configuration.addDeprecation("user.name", Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Aug 3 19:00:15 2012 @@ -513,6 +513,21 @@ + mapreduce.shuffle.ssl.enabled + false + + Whether to use SSL for for the Shuffle HTTP endpoints. + + + + + mapreduce.shuffle.ssl.file.buffer.size + 65536 + Buffer size for reading spills from file when using SSL. + + + + mapreduce.reduce.markreset.buffer.percent 0.0 The percentage of memory -relative to the maximum heap size- to @@ -692,7 +707,7 @@ - mapreduce.output.fileoutputformat.compression.type + mapreduce.output.fileoutputformat.compress.type RECORD If the job outputs are to compressed as SequenceFiles, how should they be compressed? Should be one of NONE, RECORD or BLOCK. @@ -700,7 +715,7 @@ - mapreduce.output.fileoutputformat.compression.codec + mapreduce.output.fileoutputformat.compress.codec org.apache.hadoop.io.compress.DefaultCodec If the job outputs are compressed, how should they be compressed? @@ -1285,7 +1300,7 @@ mapreduce.jobhistory.address 0.0.0.0:10020 - MapReduce JobHistory Server host:port + MapReduce JobHistory Server IPC host:port @@ -1294,4 +1309,21 @@ MapReduce JobHistory Server Web UI host:port + + mapreduce.jobhistory.keytab + + Location of the kerberos keytab file for the MapReduce + JobHistory Server. + + /etc/security/keytab/jhs.service.keytab + + + + mapreduce.jobhistory.principal + + Kerberos principal name for the MapReduce JobHistory Server. + + jhs/_HOST@REALM.TLD + + Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1358480-1369130 Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java Fri Aug 3 19:00:15 2012 @@ -35,16 +35,23 @@ import org.apache.hadoop.mapreduce.serve import junit.framework.TestCase; public class TestIndexCache extends TestCase { + private JobConf conf; + private FileSystem fs; + private Path p; + + @Override + public void setUp() throws IOException { + conf = new JobConf(); + fs = FileSystem.getLocal(conf).getRaw(); + p = new Path(System.getProperty("test.build.data", "/tmp"), + "cache").makeQualified(fs.getUri(), fs.getWorkingDirectory()); + } public void testLRCPolicy() throws Exception { Random r = new Random(); long seed = r.nextLong(); r.setSeed(seed); System.out.println("seed: " + seed); - JobConf conf = new JobConf(); - FileSystem fs = FileSystem.getLocal(conf).getRaw(); - Path p = new Path(System.getProperty("test.build.data", "/tmp"), - "cache").makeQualified(fs); fs.delete(p, true); conf.setInt(TTConfig.TT_INDEX_CACHE, 1); final int partsPerMap = 1000; @@ -115,10 +122,6 @@ public class TestIndexCache extends Test public void testBadIndex() throws Exception { final int parts = 30; - JobConf conf = new JobConf(); - FileSystem fs = FileSystem.getLocal(conf).getRaw(); - Path p = new Path(System.getProperty("test.build.data", "/tmp"), - "cache").makeQualified(fs); fs.delete(p, true); conf.setInt(TTConfig.TT_INDEX_CACHE, 1); IndexCache cache = new IndexCache(conf); @@ -150,10 +153,6 @@ public class TestIndexCache extends Test } public void testInvalidReduceNumberOrLength() throws Exception { - JobConf conf = new JobConf(); - FileSystem fs = FileSystem.getLocal(conf).getRaw(); - Path p = new Path(System.getProperty("test.build.data", "/tmp"), - "cache").makeQualified(fs); fs.delete(p, true); conf.setInt(TTConfig.TT_INDEX_CACHE, 1); final int partsPerMap = 1000; @@ -199,10 +198,6 @@ public class TestIndexCache extends Test // This test case may not repeatable. But on my macbook this test // fails with probability of 100% on code before MAPREDUCE-2541, // so it is repeatable in practice. - JobConf conf = new JobConf(); - FileSystem fs = FileSystem.getLocal(conf).getRaw(); - Path p = new Path(System.getProperty("test.build.data", "/tmp"), - "cache").makeQualified(fs); fs.delete(p, true); conf.setInt(TTConfig.TT_INDEX_CACHE, 10); // Make a big file so removeMapThread almost surely runs faster than @@ -247,6 +242,66 @@ public class TestIndexCache extends Test } } + public void testCreateRace() throws Exception { + fs.delete(p, true); + conf.setInt(TTConfig.TT_INDEX_CACHE, 1); + final int partsPerMap = 1000; + final int bytesPerFile = partsPerMap * 24; + final IndexCache cache = new IndexCache(conf); + + final Path racy = new Path(p, "racyIndex"); + final String user = + UserGroupInformation.getCurrentUser().getShortUserName(); + writeFile(fs, racy, bytesPerFile, partsPerMap); + + // run multiple instances + Thread[] getInfoThreads = new Thread[50]; + for (int i = 0; i < 50; i++) { + getInfoThreads[i] = new Thread() { + @Override + public void run() { + try { + cache.getIndexInformation("racyIndex", partsPerMap, racy, user); + cache.removeMap("racyIndex"); + } catch (Exception e) { + // should not be here + } + } + }; + } + + for (int i = 0; i < 50; i++) { + getInfoThreads[i].start(); + } + + final Thread mainTestThread = Thread.currentThread(); + + Thread timeoutThread = new Thread() { + @Override + public void run() { + try { + Thread.sleep(15000); + mainTestThread.interrupt(); + } catch (InterruptedException ie) { + // we are done; + } + } + }; + + for (int i = 0; i < 50; i++) { + try { + getInfoThreads[i].join(); + } catch (InterruptedException ie) { + // we haven't finished in time. Potential deadlock/race. + fail("Unexpectedly long delay during concurrent cache entry creations"); + } + } + // stop the timeoutThread. If we get interrupted before stopping, there + // must be something wrong, although it wasn't a deadlock. No need to + // catch and swallow. + timeoutThread.interrupt(); + } + private static void checkRecord(IndexRecord rec, long fill) { assertEquals(fill, rec.startOffset); assertEquals(fill, rec.rawLength); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Fri Aug 3 19:00:15 2012 @@ -336,7 +336,7 @@ public class HistoryFileManager extends public synchronized Configuration loadConfFile() throws IOException { FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); Configuration jobConf = new Configuration(false); - jobConf.addResource(fc.open(confFile)); + jobConf.addResource(fc.open(confFile), confFile.toString()); return jobConf; } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Fri Aug 3 19:00:15 2012 @@ -31,6 +31,7 @@ import org.apache.hadoop.security.Securi import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.service.CompositeService; @@ -122,6 +123,7 @@ public class JobHistoryServer extends Co } public static void main(String[] args) { + Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG); try { JobHistoryServer jobHistoryServer = new JobHistoryServer(); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java?rev=1369164&r1=1369163&r2=1369164&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java Fri Aug 3 19:00:15 2012 @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.h import static org.apache.hadoop.yarn.util.StringHelper.pajoin; import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; @@ -60,10 +61,10 @@ public class HsWebApp extends WebApp imp route(pajoin("/singletaskcounter",TASK_ID, COUNTER_GROUP, COUNTER_NAME), HsController.class, "singleTaskCounter"); route("/about", HsController.class, "about"); - route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER), - HsController.class, "logs"); - route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER), - HsController.class, "nmlogs"); + route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER, + CONTAINER_LOG_TYPE), HsController.class, "logs"); + route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER, + CONTAINER_LOG_TYPE), HsController.class, "nmlogs"); } }