Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 18B27D556 for ; Wed, 26 Sep 2012 22:56:54 +0000 (UTC) Received: (qmail 24015 invoked by uid 500); 26 Sep 2012 22:56:53 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 23918 invoked by uid 500); 26 Sep 2012 22:56:53 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 23910 invoked by uid 99); 26 Sep 2012 22:56:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Sep 2012 22:56:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Sep 2012 22:56:41 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2A0892388C5D; Wed, 26 Sep 2012 22:55:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1390763 [2/3] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-clien... Date: Wed, 26 Sep 2012 22:55:32 -0000 To: mapreduce-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120926225535.2A0892388C5D@eris.apache.org> Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Wed Sep 26 22:55:00 2012 @@ -19,9 +19,11 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -43,11 +45,14 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Records; @@ -91,8 +96,6 @@ public class TestJobImpl { when(mockJob.getCommitter()).thenReturn(mockCommitter); when(mockJob.getEventHandler()).thenReturn(mockEventHandler); when(mockJob.getJobContext()).thenReturn(mockJobContext); - doNothing().when(mockJob).setFinishTime(); - doNothing().when(mockJob).logJobHistoryFinishedEvent(); when(mockJob.finished(JobState.KILLED)).thenReturn(JobState.KILLED); when(mockJob.finished(JobState.FAILED)).thenReturn(JobState.FAILED); when(mockJob.finished(JobState.SUCCEEDED)).thenReturn(JobState.SUCCEEDED); @@ -103,11 +106,13 @@ public class TestJobImpl { // commitJob stubbed out, so this can't happen } doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class)); + JobState jobState = JobImpl.checkJobCompleteSuccess(mockJob); Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " + - "for successful job", - JobImpl.checkJobCompleteSuccess(mockJob)); + "for successful job", jobState); Assert.assertEquals("checkJobCompleteSuccess returns incorrect state", - JobState.FAILED, JobImpl.checkJobCompleteSuccess(mockJob)); + JobState.FAILED, jobState); + verify(mockJob).abortJob( + eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); } @Test @@ -170,6 +175,8 @@ public class TestJobImpl { t.testCheckJobCompleteSuccess(); t.testCheckJobCompleteSuccessFailed(); t.testCheckAccess(); + t.testReportDiagnostics(); + t.testUberDecision(); } @Test @@ -239,6 +246,41 @@ public class TestJobImpl { Assert.assertTrue(job5.checkAccess(ugi1, null)); Assert.assertTrue(job5.checkAccess(ugi2, null)); } + + @Test + public void testReportDiagnostics() throws Exception { + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + final String diagMsg = "some diagnostic message"; + final JobDiagnosticsUpdateEvent diagUpdateEvent = + new JobDiagnosticsUpdateEvent(jobId, diagMsg); + MRAppMetrics mrAppMetrics = MRAppMetrics.create(); + JobImpl job = new JobImpl(jobId, Records + .newRecord(ApplicationAttemptId.class), new Configuration(), + mock(EventHandler.class), + null, mock(JobTokenSecretManager.class), null, + new SystemClock(), null, + mrAppMetrics, mock(OutputCommitter.class), + true, null, 0, null, null); + job.handle(diagUpdateEvent); + String diagnostics = job.getReport().getDiagnostics(); + Assert.assertNotNull(diagnostics); + Assert.assertTrue(diagnostics.contains(diagMsg)); + + job = new JobImpl(jobId, Records + .newRecord(ApplicationAttemptId.class), new Configuration(), + mock(EventHandler.class), + null, mock(JobTokenSecretManager.class), null, + new SystemClock(), null, + mrAppMetrics, mock(OutputCommitter.class), + true, null, 0, null, null); + job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + job.handle(diagUpdateEvent); + diagnostics = job.getReport().getDiagnostics(); + Assert.assertNotNull(diagnostics); + Assert.assertTrue(diagnostics.contains(diagMsg)); + } + @Test public void testUberDecision() throws Exception { Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Wed Sep 26 22:55:00 2012 @@ -84,7 +84,6 @@ public class TestTaskImpl { private ApplicationId appId; private TaskSplitMetaInfo taskSplitMetaInfo; private String[] dataLocations = new String[0]; - private final TaskType taskType = TaskType.MAP; private AppContext appContext; private int startCount = 0; @@ -97,6 +96,7 @@ public class TestTaskImpl { private class MockTaskImpl extends TaskImpl { private int taskAttemptCounter = 0; + TaskType taskType; public MockTaskImpl(JobId jobId, int partition, EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, @@ -104,11 +104,12 @@ public class TestTaskImpl { Token jobToken, Credentials credentials, Clock clock, Map completedTasksFromPreviousRun, int startCount, - MRAppMetrics metrics, AppContext appContext) { + MRAppMetrics metrics, AppContext appContext, TaskType taskType) { super(jobId, taskType , partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, credentials, clock, completedTasksFromPreviousRun, startCount, metrics, appContext); + this.taskType = taskType; } @Override @@ -120,7 +121,7 @@ public class TestTaskImpl { protected TaskAttemptImpl createAttempt() { MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, eventHandler, taskAttemptListener, remoteJobConfFile, partition, - conf, committer, jobToken, credentials, clock, appContext); + conf, committer, jobToken, credentials, clock, appContext, taskType); taskAttempts.add(attempt); return attempt; } @@ -142,18 +143,20 @@ public class TestTaskImpl { private float progress = 0; private TaskAttemptState state = TaskAttemptState.NEW; private TaskAttemptId attemptId; + private TaskType taskType; public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Path jobFile, int partition, JobConf conf, OutputCommitter committer, Token jobToken, Credentials credentials, Clock clock, - AppContext appContext) { + AppContext appContext, TaskType taskType) { super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf, dataLocations, committer, jobToken, credentials, clock, appContext); attemptId = Records.newRecord(TaskAttemptId.class); attemptId.setId(id); attemptId.setTaskId(taskId); + this.taskType = taskType; } public TaskAttemptId getAttemptId() { @@ -162,7 +165,7 @@ public class TestTaskImpl { @Override protected Task createRemoteTask() { - return new MockTask(); + return new MockTask(taskType); } public float getProgress() { @@ -185,6 +188,11 @@ public class TestTaskImpl { private class MockTask extends Task { + private TaskType taskType; + MockTask(TaskType taskType) { + this.taskType = taskType; + } + @Override public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { @@ -193,7 +201,7 @@ public class TestTaskImpl { @Override public boolean isMapTask() { - return true; + return (taskType == TaskType.MAP); } } @@ -227,14 +235,15 @@ public class TestTaskImpl { taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); - taskAttempts = new ArrayList(); - - mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), + taskAttempts = new ArrayList(); + } + + private MockTaskImpl createMockTask(TaskType taskType) { + return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, credentials, clock, completedTasksFromPreviousRun, startCount, - metrics, appContext); - + metrics, appContext, taskType); } @After @@ -342,6 +351,7 @@ public class TestTaskImpl { @Test public void testInit() { LOG.info("--- START: testInit ---"); + mockTask = createMockTask(TaskType.MAP); assertTaskNewState(); assert(taskAttempts.size() == 0); } @@ -352,6 +362,7 @@ public class TestTaskImpl { */ public void testScheduleTask() { LOG.info("--- START: testScheduleTask ---"); + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); } @@ -362,6 +373,7 @@ public class TestTaskImpl { */ public void testKillScheduledTask() { LOG.info("--- START: testKillScheduledTask ---"); + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); killTask(taskId); @@ -374,6 +386,7 @@ public class TestTaskImpl { */ public void testKillScheduledTaskAttempt() { LOG.info("--- START: testKillScheduledTaskAttempt ---"); + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); killScheduledTaskAttempt(getLastAttempt().getAttemptId()); @@ -386,6 +399,7 @@ public class TestTaskImpl { */ public void testLaunchTaskAttempt() { LOG.info("--- START: testLaunchTaskAttempt ---"); + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); launchTaskAttempt(getLastAttempt().getAttemptId()); @@ -398,6 +412,7 @@ public class TestTaskImpl { */ public void testKillRunningTaskAttempt() { LOG.info("--- START: testKillRunningTaskAttempt ---"); + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); launchTaskAttempt(getLastAttempt().getAttemptId()); @@ -407,6 +422,7 @@ public class TestTaskImpl { @Test public void testTaskProgress() { LOG.info("--- START: testTaskProgress ---"); + mockTask = createMockTask(TaskType.MAP); // launch task TaskId taskId = getNewTaskID(); @@ -444,6 +460,7 @@ public class TestTaskImpl { @Test public void testFailureDuringTaskAttemptCommit() { + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); launchTaskAttempt(getLastAttempt().getAttemptId()); @@ -469,8 +486,7 @@ public class TestTaskImpl { assertTaskSucceededState(); } - @Test - public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { + private void runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType failEvent) { TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); launchTaskAttempt(getLastAttempt().getAttemptId()); @@ -489,11 +505,34 @@ public class TestTaskImpl { // Now fail the first task attempt, after the second has succeeded mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + failEvent)); // The task should still be in the succeeded state assertTaskSucceededState(); - + } + + @Test + public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED); + } + + @Test + public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { + mockTask = createMockTask(TaskType.REDUCE); + runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED); + } + + @Test + public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() { + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED); + } + + @Test + public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() { + mockTask = createMockTask(TaskType.REDUCE); + runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED); } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Wed Sep 26 22:55:00 2012 @@ -152,6 +152,10 @@ class LocalDistributedCacheManager { localArchives.add(pathString); } else if (resource.getType() == LocalResourceType.FILE) { localFiles.add(pathString); + } else if (resource.getType() == LocalResourceType.PATTERN) { + //PATTERN is not currently used in local mode + throw new IllegalArgumentException("Resource type PATTERN is not " + + "implemented yet. " + resource.getResource()); } Path resourcePath; try { Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Wed Sep 26 22:55:00 2012 @@ -61,11 +61,6 @@ public class JHAdminConfig { MR_HISTORY_PREFIX + "datestring.cache.size"; public static final int DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE = 200000; - //TODO REMOVE debug-mode - /** Equivalent to 0.20 mapreduce.jobhistory.debug.mode */ - public static final String MR_HISTORY_DEBUG_MODE = - MR_HISTORY_PREFIX + "debug-mode"; - /** Path where history files should be stored for DONE jobs. **/ public static final String MR_HISTORY_DONE_DIR = MR_HISTORY_PREFIX + "done-dir"; Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Wed Sep 26 22:55:00 2012 @@ -79,6 +79,13 @@ public class JobHistoryUtils { public static final FsPermission HISTORY_DONE_FILE_PERMISSION = FsPermission.createImmutable((short) 0770); // rwx------ + + /** + * Umask for the done dir and derivatives. + */ + public static final FsPermission HISTORY_DONE_DIR_UMASK = FsPermission + .createImmutable((short) (0770 ^ 0777)); + /** * Permissions for the intermediate done directory. @@ -336,20 +343,19 @@ public class JobHistoryUtils { /** * Gets the timestamp component based on millisecond time. * @param millisecondTime - * @param debugMode * @return the timestamp component based on millisecond time */ - public static String timestampDirectoryComponent(long millisecondTime, boolean debugMode) { + public static String timestampDirectoryComponent(long millisecondTime) { Calendar timestamp = Calendar.getInstance(); timestamp.setTimeInMillis(millisecondTime); String dateString = null; - dateString = String.format( - TIMESTAMP_DIR_FORMAT, - timestamp.get(Calendar.YEAR), - // months are 0-based in Calendar, but people will expect January - // to be month #1. - timestamp.get(debugMode ? Calendar.HOUR : Calendar.MONTH) + 1, - timestamp.get(debugMode ? Calendar.MINUTE : Calendar.DAY_OF_MONTH)); + dateString = String + .format(TIMESTAMP_DIR_FORMAT, + timestamp.get(Calendar.YEAR), + // months are 0-based in Calendar, but people will expect January to + // be month #1. + timestamp.get(Calendar.MONTH) + 1, + timestamp.get(Calendar.DAY_OF_MONTH)); dateString = dateString.intern(); return dateString; } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Wed Sep 26 22:55:00 2012 @@ -179,6 +179,12 @@ public class MRApps extends Apps { Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c .trim()); } + for (String c : conf.getStrings( + MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, + MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) { + Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c + .trim()); + } } finally { if (classpathFileStream != null) { classpathFileStream.close(); @@ -204,7 +210,7 @@ public class MRApps extends Apps { Apps.addToEnvironment( environment, Environment.CLASSPATH.name(), - MRJobConfig.JOB_JAR + Path.SEPARATOR); + MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR); Apps.addToEnvironment( environment, Environment.CLASSPATH.name(), @@ -275,7 +281,7 @@ public class MRApps extends Apps { } private static String getResourceDescription(LocalResourceType type) { - if(type == LocalResourceType.ARCHIVE) { + if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) { return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") "; } return "cache file (" + MRJobConfig.CACHE_FILES + ") "; Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Wed Sep 26 22:55:00 2012 @@ -67,7 +67,7 @@ public class MRBuilderUtils { String userName, JobState state, long submitTime, long startTime, long finishTime, float setupProgress, float mapProgress, float reduceProgress, float cleanupProgress, String jobFile, List amInfos, - boolean isUber) { + boolean isUber, String diagnostics) { JobReport report = Records.newRecord(JobReport.class); report.setJobId(jobId); report.setJobName(jobName); @@ -83,6 +83,7 @@ public class MRBuilderUtils { report.setJobFile(jobFile); report.setAMInfos(amInfos); report.setIsUber(isUber); + report.setDiagnostics(diagnostics); return report; } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Wed Sep 26 22:55:00 2012 @@ -140,11 +140,19 @@ public class TestMRApps { Map environment = new HashMap(); MRApps.setClasspath(environment, job.getConfiguration()); assertTrue(environment.get("CLASSPATH").startsWith("$PWD:")); - String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH); - if (confClasspath != null) { - confClasspath = confClasspath.replaceAll(",\\s*", ":").trim(); + String yarnAppClasspath = + job.getConfiguration().get( + YarnConfiguration.YARN_APPLICATION_CLASSPATH); + if (yarnAppClasspath != null) { + yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", ":").trim(); } - assertTrue(environment.get("CLASSPATH").contains(confClasspath)); + assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath)); + String mrAppClasspath = + job.getConfiguration().get(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH); + if (mrAppClasspath != null) { + mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", ":").trim(); + } + assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath)); } @Test public void testSetClasspathWithUserPrecendence() { @@ -158,7 +166,7 @@ public class TestMRApps { } String env_str = env.get("CLASSPATH"); assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!", - env_str.indexOf("$PWD:job.jar/:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0); + env_str.indexOf("$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0); } @Test public void testSetClasspathWithNoUserPrecendence() { @@ -172,7 +180,7 @@ public class TestMRApps { } String env_str = env.get("CLASSPATH"); int index = - env_str.indexOf("job.jar/:job.jar/classes/:job.jar/lib/*:$PWD/*"); + env_str.indexOf("job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*"); assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not" + " in the classpath!", index, -1); assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!", Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Wed Sep 26 22:55:00 2012 @@ -68,6 +68,24 @@ + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + target/generated-sources/avro + + + + + + org.apache.maven.plugins maven-antrun-plugin Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Wed Sep 26 22:55:00 2012 @@ -1357,7 +1357,7 @@ public class JobConf extends Configurati * @return the maximum no. of failures of a given job per tasktracker. */ public int getMaxTaskFailuresPerTracker() { - return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 4); + return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3); } /** Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Wed Sep 26 22:55:00 2012 @@ -435,13 +435,9 @@ class JobSubmitter { private void printTokens(JobID jobId, Credentials credentials) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Printing tokens for job: " + jobId); - for(Token token: credentials.getAllTokens()) { - if (token.getKind().toString().equals("HDFS_DELEGATION_TOKEN")) { - LOG.debug("Submitting with " + token); - } - } + LOG.info("Submitting tokens for job: " + jobId); + for (Token token: credentials.getAllTokens()) { + LOG.info(token); } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Wed Sep 26 22:55:00 2012 @@ -587,4 +587,18 @@ public interface MRJobConfig { MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT = "security.job.client.protocol.acl"; + /** + * CLASSPATH for all YARN MapReduce applications. + */ + public static final String MAPREDUCE_APPLICATION_CLASSPATH = + "mapreduce.application.classpath"; + + /** + * Default CLASSPATH for all YARN MapReduce applications. + */ + public static final String[] DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH = { + "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*", + "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*", + }; + } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Wed Sep 26 22:55:00 2012 @@ -520,5 +520,10 @@ public class ConfigUtil { MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT }); } + + public static void main(String[] args) { + loadResources(); + Configuration.dumpDeprecatedKeys(); + } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Wed Sep 26 22:55:00 2012 @@ -797,9 +797,12 @@ mapreduce.job.maxtaskfailures.per.tracker - 4 + 3 The number of task-failures on a tasktracker of a given job - after which new tasks of that job aren't assigned to it. + after which new tasks of that job aren't assigned to it. It + MUST be less than mapreduce.map.maxattempts and + mapreduce.reduce.maxattempts otherwise the failed task will + never be tried on a different node. @@ -1200,6 +1203,8 @@ + + mapreduce.job.counters.limit 120 @@ -1309,6 +1314,13 @@ The amount of memory the MR AppMaster needs. + + CLASSPATH for MR applications. A comma-separated list + of CLASSPATH entries + mapreduce.application.classpath + $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/* + + Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1379224-1390762 Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Wed Sep 26 22:55:00 2012 @@ -358,7 +358,6 @@ public class HistoryFileManager extends private Configuration conf; - private boolean debugMode; private String serialNumberFormat; private Path doneDirPrefixPath = null; // folder for completed jobs @@ -379,8 +378,7 @@ public class HistoryFileManager extends public void init(Configuration conf) { this.conf = conf; - debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false); - int serialNumberLowDigits = debugMode ? 1 : 3; + int serialNumberLowDigits = 3; serialNumberFormat = ("%0" + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) + "d"); @@ -392,6 +390,7 @@ public class HistoryFileManager extends doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( new Path(doneDirPrefix)); doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); + doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); } catch (IOException e) { @@ -779,8 +778,8 @@ public class HistoryFileManager extends } private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { - String timestampComponent = JobHistoryUtils.timestampDirectoryComponent( - millisecondTime, debugMode); + String timestampComponent = JobHistoryUtils + .timestampDirectoryComponent(millisecondTime); return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( id, timestampComponent, serialNumberFormat)); } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml Wed Sep 26 22:55:00 2012 @@ -102,7 +102,7 @@ dfs.permissions.enabledtrue mapreduce.tasktracker.taskcontrollerorg.apache.hadoop.mapred.DefaultTaskController mapreduce.reduce.shuffle.parallelcopies5 -yarn.nodemanager.env-whitelistJAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,YARN_HOME +yarn.nodemanager.env-whitelistJAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,HADOOP_YARN_HOME mapreduce.jobtracker.heartbeats.in.second100 mapreduce.job.maxtaskfailures.per.tracker4 ipc.client.connection.maxidletime10000 @@ -317,8 +317,8 @@ $HADOOP_COMMON_HOME/share/hadoop/common/lib/*, $HADOOP_HDFS_HOME/share/hadoop/hdfs/*, $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*, - $YARN_HOME/share/hadoop/mapreduce/*, - $YARN_HOME/share/hadoop/mapreduce/lib/* + $HADOOP_YARN_HOME/share/hadoop/mapreduce/*, + $HADOOP_YARN_HOME/share/hadoop/mapreduce/lib/* yarn.nodemanager.log-aggregation.compression-typegz dfs.image.compressfalse Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Wed Sep 26 22:55:00 2012 @@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ProtoUtils; -import org.hadoop.yarn.client.YarnClientImpl; +import org.apache.hadoop.yarn.client.YarnClientImpl; public class ResourceMgrDelegate extends YarnClientImpl { private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Wed Sep 26 22:55:00 2012 @@ -346,9 +346,13 @@ public class YARNRunner implements Clien jobConfPath, LocalResourceType.FILE)); if (jobConf.get(MRJobConfig.JAR) != null) { Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR)); - localResources.put(MRJobConfig.JOB_JAR, - createApplicationResource(defaultFileContext, - jobJarPath, LocalResourceType.ARCHIVE)); + LocalResource rc = createApplicationResource(defaultFileContext, + jobJarPath, + LocalResourceType.PATTERN); + String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, + JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); + rc.setPattern(pattern); + localResources.put(MRJobConfig.JOB_JAR, rc); } else { // Job jar may be null. For e.g, for pipes, the job jar is the hadoop // mapreduce jar itself which is already on the classpath. Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java Wed Sep 26 22:55:00 2012 @@ -21,6 +21,7 @@ import org.junit.Assert; import org.junit.Test; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; public class TestJobConf { @@ -185,4 +186,19 @@ public class TestJobConf { } + + /** + * Ensure that by default JobContext.MAX_TASK_FAILURES_PER_TRACKER is less + * JobContext.MAP_MAX_ATTEMPTS and JobContext.REDUCE_MAX_ATTEMPTS so that + * failed tasks will be retried on other nodes + */ + @Test + public void testMaxTaskFailuresPerTracker() { + JobConf jobConf = new JobConf(true); + Assert.assertTrue("By default JobContext.MAX_TASK_FAILURES_PER_TRACKER was " + + "not less than JobContext.MAP_MAX_ATTEMPTS and REDUCE_MAX_ATTEMPTS" + ,jobConf.getMaxTaskFailuresPerTracker() < jobConf.getMaxMapAttempts() && + jobConf.getMaxTaskFailuresPerTracker() < jobConf.getMaxReduceAttempts() + ); + } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java Wed Sep 26 22:55:00 2012 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.fs; +import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; import org.apache.hadoop.conf.Configured; @@ -33,7 +34,6 @@ import org.apache.hadoop.mapred.*; * statistics data to be collected by subsequent reducers. * */ -@SuppressWarnings("deprecation") public abstract class IOMapperBase extends Configured implements Mapper { @@ -41,6 +41,7 @@ public abstract class IOMapperBase ex protected int bufferSize; protected FileSystem fs; protected String hostName; + protected Closeable stream; public IOMapperBase() { } @@ -79,6 +80,18 @@ public abstract class IOMapperBase ex long value) throws IOException; /** + * Create an input or output stream based on the specified file. + * Subclasses should override this method to provide an actual stream. + * + * @param name file name + * @return the stream + * @throws IOException + */ + public Closeable getIOStream(String name) throws IOException { + return null; + } + + /** * Collect stat data to be combined by a subsequent reducer. * * @param output @@ -113,9 +126,15 @@ public abstract class IOMapperBase ex long longValue = value.get(); reporter.setStatus("starting " + name + " ::host = " + hostName); - + + this.stream = getIOStream(name); + T statValue = null; long tStart = System.currentTimeMillis(); - T statValue = doIO(reporter, name, longValue); + try { + statValue = doIO(reporter, name, longValue); + } finally { + if(stream != null) stream.close(); + } long tEnd = System.currentTimeMillis(); long execTime = tEnd - tStart; collectStats(output, name, execTime, statValue); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java Wed Sep 26 22:55:00 2012 @@ -19,18 +19,19 @@ package org.apache.hadoop.fs; import java.io.BufferedReader; +import java.io.Closeable; import java.io.DataInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintStream; import java.util.Date; +import java.util.Random; import java.util.StringTokenizer; -import junit.framework.TestCase; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,18 +39,30 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; /** * Distributed i/o benchmark. *

* This test writes into or reads from a specified number of files. - * File size is specified as a parameter to the test. + * Number of bytes to write or read is specified as a parameter to the test. * Each file is accessed in a separate map task. *

* The reducer collects the following statistics: @@ -72,24 +85,24 @@ import org.apache.hadoop.util.ToolRunner *

  • standard deviation of i/o rate
  • * */ -public class TestDFSIO extends TestCase implements Tool { +public class TestDFSIO implements Tool { // Constants private static final Log LOG = LogFactory.getLog(TestDFSIO.class); - private static final int TEST_TYPE_READ = 0; - private static final int TEST_TYPE_WRITE = 1; - private static final int TEST_TYPE_CLEANUP = 2; - private static final int TEST_TYPE_APPEND = 3; private static final int DEFAULT_BUFFER_SIZE = 1000000; private static final String BASE_FILE_NAME = "test_io_"; private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; private static final long MEGA = ByteMultiple.MB.value(); + private static final int DEFAULT_NR_BYTES = 1; + private static final int DEFAULT_NR_FILES = 4; private static final String USAGE = - "Usage: " + TestDFSIO.class.getSimpleName() + - " [genericOptions]" + - " -read | -write | -append | -clean [-nrFiles N]" + - " [-fileSize Size[B|KB|MB|GB|TB]]" + - " [-resFile resultFileName] [-bufferSize Bytes]" + - " [-rootDir]"; + "Usage: " + TestDFSIO.class.getSimpleName() + + " [genericOptions]" + + " -read [-random | -backward | -skip [-skipSize Size]] |" + + " -write | -append | -clean" + + " [-nrFiles N]" + + " [-size Size[B|KB|MB|GB|TB]]" + + " [-resFile resultFileName] [-bufferSize Bytes]" + + " [-rootDir]"; private Configuration config; @@ -100,6 +113,27 @@ public class TestDFSIO extends TestCase Configuration.addDefaultResource("mapred-site.xml"); } + private static enum TestType { + TEST_TYPE_READ("read"), + TEST_TYPE_WRITE("write"), + TEST_TYPE_CLEANUP("cleanup"), + TEST_TYPE_APPEND("append"), + TEST_TYPE_READ_RANDOM("random read"), + TEST_TYPE_READ_BACKWARD("backward read"), + TEST_TYPE_READ_SKIP("skip read"); + + private String type; + + private TestType(String t) { + type = t; + } + + @Override // String + public String toString() { + return type; + } + } + static enum ByteMultiple { B(1L), KB(0x400L), @@ -154,62 +188,100 @@ public class TestDFSIO extends TestCase private static Path getAppendDir(Configuration conf) { return new Path(getBaseDir(conf), "io_append"); } + private static Path getRandomReadDir(Configuration conf) { + return new Path(getBaseDir(conf), "io_random_read"); + } private static Path getDataDir(Configuration conf) { return new Path(getBaseDir(conf), "io_data"); } - /** - * Run the test with default parameters. - * - * @throws Exception - */ - public void testIOs() throws Exception { - TestDFSIO bench = new TestDFSIO(); - bench.testIOs(1, 4); + private static MiniDFSCluster cluster; + private static TestDFSIO bench; + + @BeforeClass + public static void beforeClass() throws Exception { + bench = new TestDFSIO(); + bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); + cluster = new MiniDFSCluster.Builder(bench.getConf()) + .numDataNodes(2) + .format(true) + .build(); + FileSystem fs = cluster.getFileSystem(); + bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES); + } + + @AfterClass + public static void afterClass() throws Exception { + if(cluster == null) + return; + FileSystem fs = cluster.getFileSystem(); + bench.cleanup(fs); + cluster.shutdown(); } - /** - * Run the test with the specified parameters. - * - * @param fileSize file size - * @param nrFiles number of files - * @throws IOException - */ - public void testIOs(int fileSize, int nrFiles) - throws IOException { - config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster(config, 2, true, null); - FileSystem fs = cluster.getFileSystem(); + @Test + public void testWrite() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.writeTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime); + } - createControlFile(fs, fileSize, nrFiles); - long tStart = System.currentTimeMillis(); - writeTest(fs); - long execTime = System.currentTimeMillis() - tStart; - analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME); + @Test + public void testRead() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.readTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime); + } - tStart = System.currentTimeMillis(); - readTest(fs); - execTime = System.currentTimeMillis() - tStart; - analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME); + @Test + public void testReadRandom() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.getConf().setLong("test.io.skip.size", 0); + bench.randomReadTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime); + } - tStart = System.currentTimeMillis(); - appendTest(fs); - execTime = System.currentTimeMillis() - tStart; - analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME); + @Test + public void testReadBackward() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.getConf().setLong("test.io.skip.size", -DEFAULT_BUFFER_SIZE); + bench.randomReadTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime); + } - cleanup(fs); - } finally { - if(cluster != null) cluster.shutdown(); - } + @Test + public void testReadSkip() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.getConf().setLong("test.io.skip.size", 1); + bench.randomReadTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime); } + @Test + public void testAppend() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.appendTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime); + } + + @SuppressWarnings("deprecation") private void createControlFile(FileSystem fs, - long fileSize, // in bytes + long nrBytes, // in bytes int nrFiles ) throws IOException { - LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files"); + LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files"); Path controlDir = getControlDir(config); fs.delete(controlDir, true); @@ -222,7 +294,7 @@ public class TestDFSIO extends TestCase writer = SequenceFile.createWriter(fs, config, controlFile, Text.class, LongWritable.class, CompressionType.NONE); - writer.append(new Text(name), new LongWritable(fileSize)); + writer.append(new Text(name), new LongWritable(nrBytes)); } catch(Exception e) { throw new IOException(e.getLocalizedMessage()); } finally { @@ -250,10 +322,35 @@ public class TestDFSIO extends TestCase *
  • i/o rate squared
  • * */ - private abstract static class IOStatMapper extends IOMapperBase { - IOStatMapper() { + private abstract static class IOStatMapper extends IOMapperBase { + protected CompressionCodec compressionCodec; + + IOStatMapper() { } - + + @Override // Mapper + public void configure(JobConf conf) { + super.configure(conf); + + // grab compression + String compression = getConf().get("test.io.compression.class", null); + Class codec; + + // try to initialize codec + try { + codec = (compression == null) ? null : + Class.forName(compression).asSubclass(CompressionCodec.class); + } catch(Exception e) { + throw new RuntimeException("Compression codec not found: ", e); + } + + if(codec != null) { + compressionCodec = (CompressionCodec) + ReflectionUtils.newInstance(codec, getConf()); + } + } + + @Override // IOMapperBase void collectStats(OutputCollector output, String name, long execTime, @@ -280,34 +377,38 @@ public class TestDFSIO extends TestCase /** * Write mapper class. */ - public static class WriteMapper extends IOStatMapper { + public static class WriteMapper extends IOStatMapper { public WriteMapper() { for(int i=0; i < bufferSize; i++) buffer[i] = (byte)('0' + i % 50); } - @Override + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + // create file + OutputStream out = + fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); + if(compressionCodec != null) + out = compressionCodec.createOutputStream(out); + LOG.info("out = " + out.getClass().getName()); + return out; + } + + @Override // IOMapperBase public Long doIO(Reporter reporter, String name, long totalSize // in bytes ) throws IOException { - // create file - OutputStream out; - out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); - - try { - // write to the file - long nrRemaining; - for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { - int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; - out.write(buffer, 0, curSize); - reporter.setStatus("writing " + name + "@" + - (totalSize - nrRemaining) + "/" + totalSize - + " ::host = " + hostName); - } - } finally { - out.close(); + OutputStream out = (OutputStream)this.stream; + // write to the file + long nrRemaining; + for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; + out.write(buffer, 0, curSize); + reporter.setStatus("writing " + name + "@" + + (totalSize - nrRemaining) + "/" + totalSize + + " ::host = " + hostName); } return Long.valueOf(totalSize); } @@ -321,7 +422,6 @@ public class TestDFSIO extends TestCase runIOTest(WriteMapper.class, writeDir); } - @SuppressWarnings("deprecation") private void runIOTest( Class> mapperClass, Path outputDir) throws IOException { @@ -343,33 +443,38 @@ public class TestDFSIO extends TestCase /** * Append mapper class. */ - public static class AppendMapper extends IOStatMapper { + public static class AppendMapper extends IOStatMapper { public AppendMapper() { for(int i=0; i < bufferSize; i++) buffer[i] = (byte)('0' + i % 50); } + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + // open file for append + OutputStream out = + fs.append(new Path(getDataDir(getConf()), name), bufferSize); + if(compressionCodec != null) + out = compressionCodec.createOutputStream(out); + LOG.info("out = " + out.getClass().getName()); + return out; + } + + @Override // IOMapperBase public Long doIO(Reporter reporter, String name, long totalSize // in bytes ) throws IOException { - // create file - OutputStream out; - out = fs.append(new Path(getDataDir(getConf()), name), bufferSize); - - try { - // write to the file - long nrRemaining; - for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { - int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; - out.write(buffer, 0, curSize); - reporter.setStatus("writing " + name + "@" + - (totalSize - nrRemaining) + "/" + totalSize - + " ::host = " + hostName); - } - } finally { - out.close(); + OutputStream out = (OutputStream)this.stream; + // write to the file + long nrRemaining; + for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; + out.write(buffer, 0, curSize); + reporter.setStatus("writing " + name + "@" + + (totalSize - nrRemaining) + "/" + totalSize + + " ::host = " + hostName); } return Long.valueOf(totalSize); } @@ -384,29 +489,35 @@ public class TestDFSIO extends TestCase /** * Read mapper class. */ - public static class ReadMapper extends IOStatMapper { + public static class ReadMapper extends IOStatMapper { public ReadMapper() { } + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + // open file + InputStream in = fs.open(new Path(getDataDir(getConf()), name)); + if(compressionCodec != null) + in = compressionCodec.createInputStream(in); + LOG.info("in = " + in.getClass().getName()); + return in; + } + + @Override // IOMapperBase public Long doIO(Reporter reporter, String name, long totalSize // in bytes ) throws IOException { - // open file - DataInputStream in = fs.open(new Path(getDataDir(getConf()), name)); + InputStream in = (InputStream)this.stream; long actualSize = 0; - try { - while (actualSize < totalSize) { - int curSize = in.read(buffer, 0, bufferSize); - if(curSize < 0) break; - actualSize += curSize; - reporter.setStatus("reading " + name + "@" + - actualSize + "/" + totalSize - + " ::host = " + hostName); - } - } finally { - in.close(); + while (actualSize < totalSize) { + int curSize = in.read(buffer, 0, bufferSize); + if(curSize < 0) break; + actualSize += curSize; + reporter.setStatus("reading " + name + "@" + + actualSize + "/" + totalSize + + " ::host = " + hostName); } return Long.valueOf(actualSize); } @@ -418,20 +529,111 @@ public class TestDFSIO extends TestCase runIOTest(ReadMapper.class, readDir); } + /** + * Mapper class for random reads. + * The mapper chooses a position in the file and reads bufferSize + * bytes starting at the chosen position. + * It stops after reading the totalSize bytes, specified by -size. + * + * There are three type of reads. + * 1) Random read always chooses a random position to read from: skipSize = 0 + * 2) Backward read reads file in reverse order : skipSize < 0 + * 3) Skip-read skips skipSize bytes after every read : skipSize > 0 + */ + public static class RandomReadMapper extends IOStatMapper { + private Random rnd; + private long fileSize; + private long skipSize; + + @Override // Mapper + public void configure(JobConf conf) { + super.configure(conf); + skipSize = conf.getLong("test.io.skip.size", 0); + } + + public RandomReadMapper() { + rnd = new Random(); + } + + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + Path filePath = new Path(getDataDir(getConf()), name); + this.fileSize = fs.getFileStatus(filePath).getLen(); + InputStream in = fs.open(filePath); + if(compressionCodec != null) + in = new FSDataInputStream(compressionCodec.createInputStream(in)); + LOG.info("in = " + in.getClass().getName()); + LOG.info("skipSize = " + skipSize); + return in; + } + + @Override // IOMapperBase + public Long doIO(Reporter reporter, + String name, + long totalSize // in bytes + ) throws IOException { + PositionedReadable in = (PositionedReadable)this.stream; + long actualSize = 0; + for(long pos = nextOffset(-1); + actualSize < totalSize; pos = nextOffset(pos)) { + int curSize = in.read(pos, buffer, 0, bufferSize); + if(curSize < 0) break; + actualSize += curSize; + reporter.setStatus("reading " + name + "@" + + actualSize + "/" + totalSize + + " ::host = " + hostName); + } + return Long.valueOf(actualSize); + } + + /** + * Get next offset for reading. + * If current < 0 then choose initial offset according to the read type. + * + * @param current offset + * @return + */ + private long nextOffset(long current) { + if(skipSize == 0) + return rnd.nextInt((int)(fileSize)); + if(skipSize > 0) + return (current < 0) ? 0 : (current + bufferSize + skipSize); + // skipSize < 0 + return (current < 0) ? Math.max(0, fileSize - bufferSize) : + Math.max(0, current + skipSize); + } + } + + private void randomReadTest(FileSystem fs) throws IOException { + Path readDir = getRandomReadDir(config); + fs.delete(readDir, true); + runIOTest(RandomReadMapper.class, readDir); + } + private void sequentialTest(FileSystem fs, - int testType, + TestType testType, long fileSize, // in bytes int nrFiles ) throws IOException { - IOStatMapper ioer = null; - if (testType == TEST_TYPE_READ) + IOStatMapper ioer = null; + switch(testType) { + case TEST_TYPE_READ: ioer = new ReadMapper(); - else if (testType == TEST_TYPE_WRITE) + break; + case TEST_TYPE_WRITE: ioer = new WriteMapper(); - else if (testType == TEST_TYPE_APPEND) + break; + case TEST_TYPE_APPEND: ioer = new AppendMapper(); - else + break; + case TEST_TYPE_READ_RANDOM: + case TEST_TYPE_READ_BACKWARD: + case TEST_TYPE_READ_SKIP: + ioer = new RandomReadMapper(); + break; + default: return; + } for(int i=0; i < nrFiles; i++) ioer.doIO(Reporter.NULL, BASE_FILE_NAME+Integer.toString(i), @@ -454,13 +656,15 @@ public class TestDFSIO extends TestCase @Override // Tool public int run(String[] args) throws IOException { - int testType = TEST_TYPE_READ; + TestType testType = null; int bufferSize = DEFAULT_BUFFER_SIZE; - long fileSize = 1*MEGA; + long nrBytes = 1*MEGA; int nrFiles = 1; + long skipSize = 0; String resFileName = DEFAULT_RES_FILE_NAME; + String compressionClass = null; boolean isSequential = false; - String version = TestDFSIO.class.getSimpleName() + ".0.0.6"; + String version = TestDFSIO.class.getSimpleName() + ".1.7"; LOG.info(version); if (args.length == 0) { @@ -470,19 +674,32 @@ public class TestDFSIO extends TestCase for (int i = 0; i < args.length; i++) { // parse command line if (args[i].startsWith("-read")) { - testType = TEST_TYPE_READ; + testType = TestType.TEST_TYPE_READ; } else if (args[i].equals("-write")) { - testType = TEST_TYPE_WRITE; + testType = TestType.TEST_TYPE_WRITE; } else if (args[i].equals("-append")) { - testType = TEST_TYPE_APPEND; + testType = TestType.TEST_TYPE_APPEND; + } else if (args[i].equals("-random")) { + if(testType != TestType.TEST_TYPE_READ) return -1; + testType = TestType.TEST_TYPE_READ_RANDOM; + } else if (args[i].equals("-backward")) { + if(testType != TestType.TEST_TYPE_READ) return -1; + testType = TestType.TEST_TYPE_READ_BACKWARD; + } else if (args[i].equals("-skip")) { + if(testType != TestType.TEST_TYPE_READ) return -1; + testType = TestType.TEST_TYPE_READ_SKIP; } else if (args[i].equals("-clean")) { - testType = TEST_TYPE_CLEANUP; + testType = TestType.TEST_TYPE_CLEANUP; } else if (args[i].startsWith("-seq")) { isSequential = true; + } else if (args[i].startsWith("-compression")) { + compressionClass = args[++i]; } else if (args[i].equals("-nrFiles")) { nrFiles = Integer.parseInt(args[++i]); - } else if (args[i].equals("-fileSize")) { - fileSize = parseSize(args[++i]); + } else if (args[i].equals("-fileSize") || args[i].equals("-size")) { + nrBytes = parseSize(args[++i]); + } else if (args[i].equals("-skipSize")) { + skipSize = parseSize(args[++i]); } else if (args[i].equals("-bufferSize")) { bufferSize = Integer.parseInt(args[++i]); } else if (args[i].equals("-resFile")) { @@ -492,36 +709,59 @@ public class TestDFSIO extends TestCase return -1; } } + if(testType == null) + return -1; + if(testType == TestType.TEST_TYPE_READ_BACKWARD) + skipSize = -bufferSize; + else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) + skipSize = bufferSize; LOG.info("nrFiles = " + nrFiles); - LOG.info("fileSize (MB) = " + toMB(fileSize)); + LOG.info("nrBytes (MB) = " + toMB(nrBytes)); LOG.info("bufferSize = " + bufferSize); + if(skipSize > 0) + LOG.info("skipSize = " + skipSize); LOG.info("baseDir = " + getBaseDir(config)); + + if(compressionClass != null) { + config.set("test.io.compression.class", compressionClass); + LOG.info("compressionClass = " + compressionClass); + } config.setInt("test.io.file.buffer.size", bufferSize); + config.setLong("test.io.skip.size", skipSize); config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); FileSystem fs = FileSystem.get(config); if (isSequential) { long tStart = System.currentTimeMillis(); - sequentialTest(fs, testType, fileSize, nrFiles); + sequentialTest(fs, testType, nrBytes, nrFiles); long execTime = System.currentTimeMillis() - tStart; String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; LOG.info(resultLine); return 0; } - if (testType == TEST_TYPE_CLEANUP) { + if (testType == TestType.TEST_TYPE_CLEANUP) { cleanup(fs); return 0; } - createControlFile(fs, fileSize, nrFiles); + createControlFile(fs, nrBytes, nrFiles); long tStart = System.currentTimeMillis(); - if (testType == TEST_TYPE_WRITE) + switch(testType) { + case TEST_TYPE_WRITE: writeTest(fs); - if (testType == TEST_TYPE_READ) + break; + case TEST_TYPE_READ: readTest(fs); - if (testType == TEST_TYPE_APPEND) + break; + case TEST_TYPE_APPEND: appendTest(fs); + break; + case TEST_TYPE_READ_RANDOM: + case TEST_TYPE_READ_BACKWARD: + case TEST_TYPE_READ_SKIP: + randomReadTest(fs); + } long execTime = System.currentTimeMillis() - tStart; analyzeResult(fs, testType, execTime, resFileName); @@ -547,9 +787,9 @@ public class TestDFSIO extends TestCase static long parseSize(String arg) { String[] args = arg.split("\\D", 2); // get digits assert args.length <= 2; - long fileSize = Long.parseLong(args[0]); + long nrBytes = Long.parseLong(args[0]); String bytesMult = arg.substring(args[0].length()); // get byte multiple - return fileSize * ByteMultiple.parseString(bytesMult).value(); + return nrBytes * ByteMultiple.parseString(bytesMult).value(); } static float toMB(long bytes) { @@ -557,17 +797,11 @@ public class TestDFSIO extends TestCase } private void analyzeResult( FileSystem fs, - int testType, + TestType testType, long execTime, String resFileName ) throws IOException { - Path reduceFile; - if (testType == TEST_TYPE_WRITE) - reduceFile = new Path(getWriteDir(config), "part-00000"); - else if (testType == TEST_TYPE_APPEND) - reduceFile = new Path(getAppendDir(config), "part-00000"); - else // if (testType == TEST_TYPE_READ) - reduceFile = new Path(getReadDir(config), "part-00000"); + Path reduceFile = getReduceFilePath(testType); long tasks = 0; long size = 0; long time = 0; @@ -601,10 +835,7 @@ public class TestDFSIO extends TestCase double med = rate / 1000 / tasks; double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); String resultLines[] = { - "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : - (testType == TEST_TYPE_READ) ? "read" : - (testType == TEST_TYPE_APPEND) ? "append" : - "unknown"), + "----- TestDFSIO ----- : " + testType, " Date & time: " + new Date(System.currentTimeMillis()), " Number of files: " + tasks, "Total MBytes processed: " + toMB(size), @@ -626,6 +857,27 @@ public class TestDFSIO extends TestCase } } + private Path getReduceFilePath(TestType testType) { + switch(testType) { + case TEST_TYPE_WRITE: + return new Path(getWriteDir(config), "part-00000"); + case TEST_TYPE_APPEND: + return new Path(getAppendDir(config), "part-00000"); + case TEST_TYPE_READ: + return new Path(getReadDir(config), "part-00000"); + case TEST_TYPE_READ_RANDOM: + case TEST_TYPE_READ_BACKWARD: + case TEST_TYPE_READ_SKIP: + return new Path(getRandomReadDir(config), "part-00000"); + } + return null; + } + + private void analyzeResult(FileSystem fs, TestType testType, long execTime) + throws IOException { + analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME); + } + private void cleanup(FileSystem fs) throws IOException { LOG.info("Cleaning up test files"); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Operation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Operation.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Operation.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Operation.java Wed Sep 26 22:55:00 2012 @@ -41,7 +41,9 @@ abstract class Operation { this.config = cfg; this.type = type; this.rnd = rnd; - this.finder = new PathFinder(cfg, rnd); + // Use a new Random instance so that the sequence of file names produced is + // the same even in case of unsuccessful operations + this.finder = new PathFinder(cfg, new Random(rnd.nextInt())); } /** Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/SliveMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/SliveMapper.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/SliveMapper.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/SliveMapper.java Wed Sep 26 22:55:00 2012 @@ -32,6 +32,8 @@ import org.apache.hadoop.mapred.MapReduc import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.StringUtils; /** @@ -50,8 +52,7 @@ public class SliveMapper extends MapRedu private FileSystem filesystem; private ConfigExtractor config; - private WeightSelector selector; - private Random rnd; + private int taskId; /* * (non-Javadoc) @@ -70,19 +71,19 @@ public class SliveMapper extends MapRedu } try { config = new ConfigExtractor(conf); - Long rndSeed = config.getRandomSeed(); - if (rndSeed != null) { - rnd = new Random(rndSeed); - } else { - rnd = new Random(); - } - selector = new WeightSelector(config, rnd); ConfigExtractor.dumpOptions(config); } catch (Exception e) { LOG.error("Unable to setup slive " + StringUtils.stringifyException(e)); throw new RuntimeException("Unable to setup slive configuration", e); } - + if(conf.get(MRJobConfig.TASK_ATTEMPT_ID) != null ) { + this.taskId = TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID)) + .getTaskID().getId(); + } else { + // So that branch-1/0.20 can run this same code as well + this.taskId = TaskAttemptID.forName(conf.get("mapred.task.id")) + .getTaskID().getId(); + } } /** @@ -95,15 +96,6 @@ public class SliveMapper extends MapRedu } /** - * Gets the operation selector to use for this object - * - * @return WeightSelector - */ - private WeightSelector getSelector() { - return selector; - } - - /** * Logs to the given reporter and logs to the internal logger at info level * * @param r @@ -154,6 +146,10 @@ public class SliveMapper extends MapRedu Reporter reporter) throws IOException { logAndSetStatus(reporter, "Running slive mapper for dummy key " + key + " and dummy value " + value); + //Add taskID to randomSeed to deterministically seed rnd. + Random rnd = config.getRandomSeed() != null ? + new Random(this.taskId + config.getRandomSeed()) : new Random(); + WeightSelector selector = new WeightSelector(config, rnd); long startTime = Timer.now(); long opAm = 0; long sleepOps = 0; @@ -163,7 +159,6 @@ public class SliveMapper extends MapRedu if (sleepRange != null) { sleeper = new SleepOp(getConfig(), rnd); } - WeightSelector selector = getSelector(); while (Timer.elapsed(startTime) < duration) { try { logAndSetStatus(reporter, "Attempting to select operation #" Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java Wed Sep 26 22:55:00 2012 @@ -58,8 +58,12 @@ public class MiniMRClientClusterFactory Job job = Job.getInstance(conf); job.addFileToClassPath(appJar); - String callerJar = JarFinder.getJar(caller); - job.setJar(callerJar); + + Path callerJar = new Path(JarFinder.getJar(caller)); + Path remoteCallerJar = new Path(testRootDir, callerJar.getName()); + fs.copyFromLocalFile(callerJar, remoteCallerJar); + fs.setPermission(remoteCallerJar, new FsPermission("744")); + job.addFileToClassPath(remoteCallerJar); MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller .getName(), noOfNMs); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1390763&r1=1390762&r2=1390763&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Wed Sep 26 22:55:00 2012 @@ -219,7 +219,8 @@ public class TestClientServiceDelegate { GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class); when(jobReportResponse1.getJobReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user", - JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false)); + JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, + false, "")); // First AM returns a report with jobName firstGen and simulates AM shutdown // on second invocation. @@ -231,7 +232,8 @@ public class TestClientServiceDelegate { GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class); when(jobReportResponse2.getJobReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user", - JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false)); + JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, + false, "")); // Second AM generation returns a report with jobName secondGen MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);