Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A454E899B for ; Thu, 8 Sep 2011 01:39:58 +0000 (UTC) Received: (qmail 5565 invoked by uid 500); 8 Sep 2011 01:39:58 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 5059 invoked by uid 500); 8 Sep 2011 01:39:57 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 5043 invoked by uid 99); 8 Sep 2011 01:39:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2011 01:39:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2011 01:39:52 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9A4F42388ACC; Thu, 8 Sep 2011 01:39:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1166495 [2/6] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai... Date: Thu, 08 Sep 2011 01:39:23 -0000 To: mapreduce-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110908013932.9A4F42388ACC@eris.apache.org> Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Sep 8 01:39:07 2011 @@ -218,7 +218,14 @@ public class MRApps extends Apps { private static final String STAGING_CONSTANT = ".staging"; public static Path getStagingAreaDir(Configuration conf, String user) { return new Path( - conf.get(MRConstants.APPS_STAGING_DIR_KEY) + + conf.get(MRConstants.APPS_STAGING_DIR_KEY) + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT); } + + public static String getJobFile(Configuration conf, String user, + org.apache.hadoop.mapreduce.JobID jobId) { + Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user), + jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE); + return jobFile.toString(); + } } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Thu Sep 8 01:39:07 2011 @@ -139,6 +139,8 @@ message JobReportProto { optional float setup_progress = 6; optional int64 start_time = 7; optional int64 finish_time = 8; + optional string user = 9; + optional string jobName = 10; } enum TaskAttemptCompletionEventStatusProto { Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Thu Sep 8 01:39:07 2011 @@ -21,8 +21,11 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationState; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.junit.Test; public class TestTypeConverter { @@ -35,8 +38,33 @@ public class TestTypeConverter { applicationReport.setApplicationId(applicationId); applicationReport.setState(state); applicationReport.setStartTime(appStartTime); - JobStatus jobStatus = TypeConverter.fromYarn(applicationReport); + applicationReport.setUser("TestTypeConverter-user"); + JobStatus jobStatus = TypeConverter.fromYarn(applicationReport, "dummy-jobfile"); Assert.assertEquals(appStartTime, jobStatus.getStartTime()); Assert.assertEquals(state.toString(), jobStatus.getState().toString()); } + + @Test + public void testFromYarnApplicationReport() { + ApplicationId mockAppId = mock(ApplicationId.class); + when(mockAppId.getClusterTimestamp()).thenReturn(12345L); + when(mockAppId.getId()).thenReturn(6789); + + ApplicationReport mockReport = mock(ApplicationReport.class); + when(mockReport.getTrackingUrl()).thenReturn("dummy-tracking-url"); + when(mockReport.getApplicationId()).thenReturn(mockAppId); + when(mockReport.getState()).thenReturn(ApplicationState.KILLED); + when(mockReport.getUser()).thenReturn("dummy-user"); + when(mockReport.getQueue()).thenReturn("dummy-queue"); + String jobFile = "dummy-path/job.xml"; + JobStatus status = TypeConverter.fromYarn(mockReport, jobFile); + Assert.assertNotNull("fromYarn returned null status", status); + Assert.assertEquals("jobFile set incorrectly", "dummy-path/job.xml", status.getJobFile()); + Assert.assertEquals("queue set incorrectly", "dummy-queue", status.getQueue()); + Assert.assertEquals("trackingUrl set incorrectly", "dummy-tracking-url", status.getTrackingUrl()); + Assert.assertEquals("user set incorrectly", "dummy-user", status.getUsername()); + Assert.assertEquals("schedulingInfo set incorrectly", "dummy-tracking-url", status.getSchedulingInfo()); + Assert.assertEquals("jobId set incorrectly", 6789, status.getJobID().getId()); + Assert.assertEquals("state set incorrectly", JobStatus.State.KILLED, status.getState()); + } } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Sep 8 01:39:07 2011 @@ -18,10 +18,13 @@ package org.apache.hadoop.mapreduce.v2.util; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -107,4 +110,14 @@ public class TestMRApps { @Test(expected=YarnException.class) public void testTaskAttemptIDShort() { MRApps.toTaskAttemptID("attempt_0_0_0_m_0"); } + + @Test public void testGetJobFileWithUser() { + Configuration conf = new Configuration(); + conf.set(MRConstants.APPS_STAGING_DIR_KEY, "/my/path/to/staging"); + String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345)); + assertNotNull("getJobFile results in null.", jobFile); + assertEquals("jobFile with specified user is not as expected.", + "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile); + } + } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Thu Sep 8 01:39:07 2011 @@ -44,7 +44,7 @@ org.apache.avro avro-maven-plugin - 1.5.2 + 1.5.3 generate-sources Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java Thu Sep 8 01:39:07 2011 @@ -372,7 +372,7 @@ public class Counters * @param id the id of the counter within the group (0 to N-1) * @param name the internal name of the counter * @return the counter for that name - * @deprecated use {@link findCounter(String, String)} instead + * @deprecated use {@link #findCounter(String, String)} instead */ @Deprecated public Counter findCounter(String group, int id, String name) { Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Thu Sep 8 01:39:07 2011 @@ -49,7 +49,7 @@ import org.apache.hadoop.util.ToolRunner /** * JobClient is the primary interface for the user-job to interact - * with the {@link JobTracker}. + * with the cluster. * * JobClient provides facilities to submit jobs, track their * progress, access component-tasks' reports/logs, get the Map-Reduce cluster @@ -72,7 +72,7 @@ import org.apache.hadoop.util.ToolRunner * on the distributed file-system. * *
  • - * Submitting the job to the JobTracker and optionally monitoring + * Submitting the job to the cluster and optionally monitoring * it's status. *
  • *

    @@ -152,7 +152,7 @@ public class JobClient extends CLI { /** * We store a JobProfile and a timestamp for when we last * acquired the job profile. If the job is null, then we cannot - * perform any of the tasks. The job might be null if the JobTracker + * perform any of the tasks. The job might be null if the cluster * has completely forgotten about the job. (eg, 24 hours after the * job completes.) */ @@ -348,7 +348,7 @@ public class JobClient extends CLI { } /** - * Fetch task completion events from jobtracker for this job. + * Fetch task completion events from cluster for this job. */ public synchronized TaskCompletionEvent[] getTaskCompletionEvents( int startFrom) throws IOException { @@ -429,7 +429,7 @@ public class JobClient extends CLI { /** * Build a job client with the given {@link JobConf}, and connect to the - * default {@link JobTracker}. + * default cluster * * @param conf the job configuration. * @throws IOException @@ -440,7 +440,7 @@ public class JobClient extends CLI { /** * Build a job client with the given {@link Configuration}, - * and connect to the default {@link JobTracker}. + * and connect to the default cluster * * @param conf the configuration. * @throws IOException @@ -450,7 +450,7 @@ public class JobClient extends CLI { } /** - * Connect to the default {@link JobTracker}. + * Connect to the default cluster * @param conf the job configuration. * @throws IOException */ Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Thu Sep 8 01:39:07 2011 @@ -476,7 +476,6 @@ public class JobConf extends Configurati /** * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. - * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes() */ @Deprecated public void deleteLocalFiles() throws IOException { Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Thu Sep 8 01:39:07 2011 @@ -1736,6 +1736,7 @@ class MapTask extends Task { indexCacheList.get(0).writeToFile( mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job); } + sortPhase.complete(); return; } @@ -1776,6 +1777,7 @@ class MapTask extends Task { } finally { finalOut.close(); } + sortPhase.complete(); return; } { Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TIPStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TIPStatus.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TIPStatus.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TIPStatus.java Thu Sep 8 01:39:07 2011 @@ -20,7 +20,7 @@ package org.apache.hadoop.mapred; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -/** The states of a {@link TaskInProgress} as seen by the JobTracker. +/** The states of a Tasks. */ @InterfaceAudience.Private @InterfaceStability.Unstable Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Thu Sep 8 01:39:07 2011 @@ -1124,8 +1124,14 @@ public class Job extends JobContextImpl IntegerRanges reduceRanges = getProfileTaskRange(false); int progMonitorPollIntervalMillis = Job.getProgressPollInterval(clientConf); - while (!isComplete()) { - Thread.sleep(progMonitorPollIntervalMillis); + /* make sure to report full progress after the job is done */ + boolean reportedAfterCompletion = false; + while (!isComplete() || !reportedAfterCompletion) { + if (isComplete()) { + reportedAfterCompletion = true; + } else { + Thread.sleep(progMonitorPollIntervalMillis); + } String report = (" map " + StringUtils.formatPercent(mapProgress(), 0)+ " reduce " + Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java Thu Sep 8 01:39:07 2011 @@ -43,8 +43,6 @@ import org.apache.hadoop.io.Text; * * @see TaskID * @see TaskAttemptID - * @see org.apache.hadoop.mapred.JobTracker#getNewJobId() - * @see org.apache.hadoop.mapred.JobTracker#getStartTime() */ @InterfaceAudience.Public @InterfaceStability.Stable Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Thu Sep 8 01:39:07 2011 @@ -22,8 +22,7 @@ import org.apache.hadoop.classification. /** * Place holder for cluster level configuration keys. * - * These keys are used by both {@link JobTracker} and {@link TaskTracker}. The - * keys should have "mapreduce.cluster." as the prefix. + * The keys should have "mapreduce.cluster." as the prefix. * */ @InterfaceAudience.Private Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java Thu Sep 8 01:39:07 2011 @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -47,6 +49,7 @@ import org.apache.hadoop.util.StringUtil @InterfaceAudience.Public @InterfaceStability.Evolving public class ControlledJob { + private static final Log LOG = LogFactory.getLog(ControlledJob.class); // A job will be in one of the following states public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED, @@ -235,6 +238,17 @@ public class ControlledJob { job.killJob(); } + public synchronized void failJob(String message) throws IOException, InterruptedException { + try { + if(job != null && this.state == State.RUNNING) { + job.killJob(); + } + } finally { + this.state = State.FAILED; + this.message = message; + } + } + /** * Check the state of this running job. The state may * remain the same, become SUCCESS or FAILED. @@ -322,6 +336,7 @@ public class ControlledJob { job.submit(); this.state = State.RUNNING; } catch (Exception ioe) { + LOG.info(getJobName()+" got an error while submitting ",ioe); this.state = State.FAILED; this.message = StringUtils.stringifyException(ioe); } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java Thu Sep 8 01:39:07 2011 @@ -21,13 +21,16 @@ package org.apache.hadoop.mapreduce.lib. import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Hashtable; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; -import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; +import org.apache.hadoop.util.StringUtils; /** * This class encapsulates a set of MapReduce jobs and its dependency. @@ -49,17 +52,16 @@ import org.apache.hadoop.mapreduce.lib.j @InterfaceAudience.Public @InterfaceStability.Evolving public class JobControl implements Runnable { + private static final Log LOG = LogFactory.getLog(JobControl.class); // The thread can be in one of the following state public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY}; private ThreadState runnerState; // the thread state - private Map waitingJobs; - private Map readyJobs; - private Map runningJobs; - private Map successfulJobs; - private Map failedJobs; + private LinkedList jobsInProgress = new LinkedList(); + private LinkedList successfulJobs = new LinkedList(); + private LinkedList failedJobs = new LinkedList(); private long nextJobID; private String groupName; @@ -69,46 +71,51 @@ public class JobControl implements Runna * @param groupName a name identifying this group */ public JobControl(String groupName) { - this.waitingJobs = new Hashtable(); - this.readyJobs = new Hashtable(); - this.runningJobs = new Hashtable(); - this.successfulJobs = new Hashtable(); - this.failedJobs = new Hashtable(); this.nextJobID = -1; this.groupName = groupName; this.runnerState = ThreadState.READY; } private static List toList( - Map jobs) { + LinkedList jobs) { ArrayList retv = new ArrayList(); synchronized (jobs) { - for (ControlledJob job : jobs.values()) { + for (ControlledJob job : jobs) { retv.add(job); } } return retv; } + synchronized private List getJobsIn(State state) { + LinkedList l = new LinkedList(); + for(ControlledJob j: jobsInProgress) { + if(j.getJobState() == state) { + l.add(j); + } + } + return l; + } + /** * @return the jobs in the waiting state */ public List getWaitingJobList() { - return toList(this.waitingJobs); + return getJobsIn(State.WAITING); } /** * @return the jobs in the running state */ public List getRunningJobList() { - return toList(this.runningJobs); + return getJobsIn(State.RUNNING); } /** * @return the jobs in the ready state */ public List getReadyJobsList() { - return toList(this.readyJobs); + return getJobsIn(State.READY); } /** @@ -126,34 +133,6 @@ public class JobControl implements Runna nextJobID += 1; return this.groupName + this.nextJobID; } - - private static void addToQueue(ControlledJob aJob, - Map queue) { - synchronized(queue) { - queue.put(aJob.getJobID(), aJob); - } - } - - private void addToQueue(ControlledJob aJob) { - Map queue = getQueue(aJob.getJobState()); - addToQueue(aJob, queue); - } - - private Map getQueue(State state) { - Map retv = null; - if (state == State.WAITING) { - retv = this.waitingJobs; - } else if (state == State.READY) { - retv = this.readyJobs; - } else if (state == State.RUNNING) { - retv = this.runningJobs; - } else if (state == State.SUCCESS) { - retv = this.successfulJobs; - } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) { - retv = this.failedJobs; - } - return retv; - } /** * Add a new job. @@ -163,7 +142,7 @@ public class JobControl implements Runna String id = this.getNextJobID(); aJob.setJobID(id); aJob.setJobState(State.WAITING); - this.addToQueue(aJob); + jobsInProgress.add(aJob); return id; } @@ -211,47 +190,8 @@ public class JobControl implements Runna } } - synchronized private void checkRunningJobs() - throws IOException, InterruptedException { - - Map oldJobs = null; - oldJobs = this.runningJobs; - this.runningJobs = new Hashtable(); - - for (ControlledJob nextJob : oldJobs.values()) { - nextJob.checkState(); - this.addToQueue(nextJob); - } - } - - synchronized private void checkWaitingJobs() - throws IOException, InterruptedException { - Map oldJobs = null; - oldJobs = this.waitingJobs; - this.waitingJobs = new Hashtable(); - - for (ControlledJob nextJob : oldJobs.values()) { - nextJob.checkState(); - this.addToQueue(nextJob); - } - } - - synchronized private void startReadyJobs() { - Map oldJobs = null; - oldJobs = this.readyJobs; - this.readyJobs = new Hashtable(); - - for (ControlledJob nextJob : oldJobs.values()) { - //Submitting Job to Hadoop - nextJob.submit(); - this.addToQueue(nextJob); - } - } - synchronized public boolean allFinished() { - return this.waitingJobs.size() == 0 && - this.readyJobs.size() == 0 && - this.runningJobs.size() == 0; + return jobsInProgress.isEmpty(); } /** @@ -262,39 +202,83 @@ public class JobControl implements Runna * Submit the jobs in ready state */ public void run() { - this.runnerState = ThreadState.RUNNING; - while (true) { - while (this.runnerState == ThreadState.SUSPENDED) { + try { + this.runnerState = ThreadState.RUNNING; + while (true) { + while (this.runnerState == ThreadState.SUSPENDED) { + try { + Thread.sleep(5000); + } + catch (Exception e) { + //TODO the thread was interrupted, do something!!! + } + } + + synchronized(this) { + Iterator it = jobsInProgress.iterator(); + while(it.hasNext()) { + ControlledJob j = it.next(); + LOG.debug("Checking state of job "+j); + switch(j.checkState()) { + case SUCCESS: + successfulJobs.add(j); + it.remove(); + break; + case FAILED: + case DEPENDENT_FAILED: + failedJobs.add(j); + it.remove(); + break; + case READY: + j.submit(); + break; + case RUNNING: + case WAITING: + //Do Nothing + break; + } + } + } + + if (this.runnerState != ThreadState.RUNNING && + this.runnerState != ThreadState.SUSPENDED) { + break; + } try { Thread.sleep(5000); } catch (Exception e) { - + //TODO the thread was interrupted, do something!!! + } + if (this.runnerState != ThreadState.RUNNING && + this.runnerState != ThreadState.SUSPENDED) { + break; } } - try { - checkRunningJobs(); - checkWaitingJobs(); - startReadyJobs(); - } catch (Exception e) { - this.runnerState = ThreadState.STOPPED; - } - if (this.runnerState != ThreadState.RUNNING && - this.runnerState != ThreadState.SUSPENDED) { - break; - } - try { - Thread.sleep(5000); - } - catch (Exception e) { - - } - if (this.runnerState != ThreadState.RUNNING && - this.runnerState != ThreadState.SUSPENDED) { - break; - } + }catch(Throwable t) { + LOG.error("Error while trying to run jobs.",t); + //Mark all jobs as failed because we got something bad. + failAllJobs(t); } this.runnerState = ThreadState.STOPPED; } + synchronized private void failAllJobs(Throwable t) { + String message = "Unexpected System Error Occured: "+ + StringUtils.stringifyException(t); + Iterator it = jobsInProgress.iterator(); + while(it.hasNext()) { + ControlledJob j = it.next(); + try { + j.failJob(message); + } catch (IOException e) { + LOG.error("Error while tyring to clean up "+j.getJobName(), e); + } catch (InterruptedException e) { + LOG.error("Error while tyring to clean up "+j.getJobName(), e); + } finally { + failedJobs.add(j); + it.remove(); + } + } + } } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Thu Sep 8 01:39:07 2011 @@ -90,7 +90,9 @@ public class CompletedJob implements org report.setJobState(JobState.valueOf(jobInfo.getJobStatus())); report.setStartTime(jobInfo.getLaunchTime()); report.setFinishTime(jobInfo.getFinishTime()); - //TOODO Possibly populate job progress. Never used. + report.setJobName(jobInfo.getJobname()); + report.setUser(jobInfo.getUsername()); + //TODO Possibly populate job progress. Never used. //report.setMapProgress(progress) //report.setReduceProgress(progress) } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Thu Sep 8 01:39:07 2011 @@ -146,4 +146,10 @@ public class CompletedTaskAttempt implem public long getFinishTime() { return report.getFinishTime(); } + + @Override + public int getShufflePort() { + throw new UnsupportedOperationException("Not supported yet."); + } + } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Sep 8 01:39:07 2011 @@ -21,6 +21,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; import java.lang.reflect.Method; import java.security.PrivilegedAction; +import java.util.HashMap; import java.util.List; import org.apache.commons.logging.Log; @@ -50,7 +51,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; -import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig; +import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.UserGroupInformation; @@ -61,24 +62,20 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.SchedulerSecurityInfo; -import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo; class ClientServiceDelegate { private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); - private static final NotRunningJob NOTSTARTEDJOB = - new NotRunningJob(JobState.NEW); - - private static final NotRunningJob FAILEDJOB = - new NotRunningJob(JobState.FAILED); - - private static final NotRunningJob KILLEDJOB = - new NotRunningJob(JobState.KILLED); + + // Caches for per-user NotRunningJobs + private static HashMap> notRunningJobs = + new HashMap>(); private final Configuration conf; private final JobID jobId; @@ -101,6 +98,24 @@ class ClientServiceDelegate { this.appId = TypeConverter.toYarn(jobId).getAppId(); } + // Get the instance of the NotRunningJob corresponding to the specified + // user and state + private NotRunningJob getNotRunningJob(String user, JobState state) { + synchronized (notRunningJobs) { + HashMap map = notRunningJobs.get(state); + if (map == null) { + map = new HashMap(); + notRunningJobs.put(state, map); + } + NotRunningJob notRunningJob = map.get(user); + if (notRunningJob == null) { + notRunningJob = new NotRunningJob(user, state); + map.put(user, notRunningJob); + } + return notRunningJob; + } + } + private MRClientProtocol getProxy() throws YarnRemoteException { if (!forceRefresh && realProxy != null) { return realProxy; @@ -149,26 +164,30 @@ class ClientServiceDelegate { } } - /** we just want to return if its allocating, so that we dont + /** we just want to return if its allocating, so that we don't * block on it. This is to be able to return job status - * on a allocating Application. + * on an allocating Application. */ + String user = application.getUser(); + if (user == null) { + throw new YarnRemoteExceptionPBImpl("User is not set in the application report"); + } if (application.getState() == ApplicationState.NEW || application.getState() == ApplicationState.SUBMITTED) { realProxy = null; - return NOTSTARTEDJOB; + return getNotRunningJob(user, JobState.NEW); } if (application.getState() == ApplicationState.FAILED) { realProxy = null; - return FAILEDJOB; + return getNotRunningJob(user, JobState.FAILED); } if (application.getState() == ApplicationState.KILLED) { - realProxy = null; - return KILLEDJOB; - } + realProxy = null; + return getNotRunningJob(user, JobState.KILLED); + } //History server can serve a job only if application //succeeded. @@ -270,17 +289,15 @@ class ClientServiceDelegate { return result; } - JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException, - YarnRemoteException { + JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); - String stagingDir = conf.get("yarn.apps.stagingDir"); - String jobFile = stagingDir + "/" + jobId.toString(); - MRClientProtocol protocol; GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class); request.setJobId(jobId); - JobReport report = ((GetJobReportResponse) invoke("getJobReport", + JobReport report = ((GetJobReportResponse) invoke("getJobReport", GetJobReportRequest.class, request)).getJobReport(); + String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID); + //TODO: add tracking url in JobReport return TypeConverter.fromYarn(report, jobFile, ""); } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Thu Sep 8 01:39:07 2011 @@ -63,8 +63,10 @@ public class NotRunningJob implements MR RecordFactoryProvider.getRecordFactory(null); private final JobState jobState; + private final String user; - NotRunningJob(JobState jobState) { + NotRunningJob(String username, JobState jobState) { + this.user = username; this.jobState = jobState; } @@ -104,7 +106,10 @@ public class NotRunningJob implements MR JobReport jobReport = recordFactory.newRecordInstance(JobReport.class); jobReport.setJobId(request.getJobId()); - jobReport.setJobState(jobState); + jobReport.setJobState(this.jobState); + + jobReport.setUser(this.user); + // TODO: Add jobName & other job information that is available resp.setJobReport(jobReport); return resp; } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu Sep 8 01:39:07 2011 @@ -120,7 +120,7 @@ public class ResourceMgrDelegate { recordFactory.newRecordInstance(GetAllApplicationsRequest.class); GetAllApplicationsResponse response = applicationsManager.getAllApplications(request); - return TypeConverter.fromYarnApps(response.getApplicationList()); + return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf); } @@ -182,7 +182,7 @@ public class ResourceMgrDelegate { getQueueInfoRequest(queueName, true, false, false); recordFactory.newRecordInstance(GetQueueInfoRequest.class); return TypeConverter.fromYarn( - applicationsManager.getQueueInfo(request).getQueueInfo()); + applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf); } private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent, @@ -216,7 +216,7 @@ public class ResourceMgrDelegate { getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo(); getChildQueues(rootQueue, queues); - return TypeConverter.fromYarnQueueInfo(queues); + return TypeConverter.fromYarnQueueInfo(queues, this.conf); } @@ -229,7 +229,7 @@ public class ResourceMgrDelegate { getQueueInfoRequest(ROOT, false, true, false)).getQueueInfo(); getChildQueues(rootQueue, queues); - return TypeConverter.fromYarnQueueInfo(queues); + return TypeConverter.fromYarnQueueInfo(queues, this.conf); } public QueueInfo[] getChildQueues(String parent) throws IOException, @@ -242,7 +242,7 @@ public class ResourceMgrDelegate { getQueueInfoRequest(parent, false, true, false)).getQueueInfo(); getChildQueues(parentQueue, queues); - return TypeConverter.fromYarnQueueInfo(queues); + return TypeConverter.fromYarnQueueInfo(queues, this.conf); } public String getStagingAreaDir() throws IOException, InterruptedException { Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider Thu Sep 8 01:39:07 2011 @@ -12,17 +12,3 @@ # limitations under the License. # org.apache.hadoop.mapred.YarnClientProtocolProvider -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -org.apache.hadoop.mapred.YarnClientProtocolProvider \ No newline at end of file Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Sep 8 01:39:07 2011 @@ -268,6 +268,7 @@ public class TestClientRedirect { String[] split = AMHOSTADDRESS.split(":"); application.setHost(split[0]); application.setRpcPort(Integer.parseInt(split[1])); + application.setUser("TestClientRedirect-user"); GetApplicationReportResponse response = recordFactory .newRecordInstance(GetApplicationReportResponse.class); response.setApplicationReport(application); @@ -397,6 +398,11 @@ public class TestClientRedirect { JobReport jobReport = recordFactory.newRecordInstance(JobReport.class); jobReport.setJobId(request.getJobId()); jobReport.setJobState(JobState.RUNNING); + jobReport.setJobName("TestClientRedirect-jobname"); + jobReport.setUser("TestClientRedirect-user"); + jobReport.setStartTime(0L); + jobReport.setFinishTime(1L); + GetJobReportResponse response = recordFactory .newRecordInstance(GetJobReportResponse.class); response.setJobReport(jobReport); Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Thu Sep 8 01:39:07 2011 @@ -72,6 +72,8 @@ public class MiniMRYarnCluster extends M conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, Service.class); + // Non-standard shuffle port + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083); conf.setClass(NMConfig.NM_CONTAINER_EXECUTOR_CLASS, DefaultContainerExecutor.class, ContainerExecutor.class); Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Thu Sep 8 01:39:07 2011 @@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.Mappe import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; @@ -105,7 +106,8 @@ public class TestMRJobs { if (mrCluster == null) { mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName()); - mrCluster.init(new Configuration()); + Configuration conf = new Configuration(); + mrCluster.init(conf); mrCluster.start(); } @@ -150,7 +152,7 @@ public class TestMRJobs { Assert.assertTrue(succeeded); Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); verifySleepJobCounters(job); - + verifyTaskProgress(job); // TODO later: add explicit "isUber()" checks of some sort (extend // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value @@ -172,6 +174,18 @@ public class TestMRJobs { .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); } + + protected void verifyTaskProgress(Job job) throws InterruptedException, + IOException { + for (TaskReport taskReport : job.getTaskReports(TaskType.MAP)) { + Assert.assertTrue(0.9999f < taskReport.getProgress() + && 1.0001f > taskReport.getProgress()); + } + for (TaskReport taskReport : job.getTaskReports(TaskType.REDUCE)) { + Assert.assertTrue(0.9999f < taskReport.getProgress() + && 1.0001f > taskReport.getProgress()); + } + } @Test public void testRandomWriter() throws IOException, InterruptedException, @@ -197,6 +211,7 @@ public class TestMRJobs { boolean succeeded = job.waitForCompletion(true); Assert.assertTrue(succeeded); Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + // Make sure there are three files in the output-dir RemoteIterator iterator = Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Sep 8 01:39:07 2011 @@ -120,7 +120,8 @@ public class ShuffleHandler extends Abst private static final JobTokenSecretManager secretManager = new JobTokenSecretManager(); - public static final String SHUFFLE_PORT = "mapreduce.shuffle.port"; + public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port"; + public static final int DEFAULT_SHUFFLE_PORT = 8080; @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @@ -155,15 +156,59 @@ public class ShuffleHandler extends Abst this(DefaultMetricsSystem.instance()); } + /** + * Serialize the shuffle port into a ByteBuffer for use later on. + * @param port the port to be sent to the ApplciationMaster + * @return the serialized form of the port. + */ + static ByteBuffer serializeMetaData(int port) throws IOException { + //TODO these bytes should be versioned + DataOutputBuffer port_dob = new DataOutputBuffer(); + port_dob.writeInt(port); + return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength()); + } + + /** + * A helper function to deserialize the metadata returned by ShuffleHandler. + * @param meta the metadata returned by the ShuffleHandler + * @return the port the Shuffle Handler is listening on to serve shuffle data. + */ + public static int deserializeMetaData(ByteBuffer meta) throws IOException { + //TODO this should be returning a class not just an int + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(meta); + int port = in.readInt(); + return port; + } + + /** + * A helper function to serialize the JobTokenIdentifier to be sent to the + * ShuffleHandler as ServiceData. + * @param jobToken the job token to be used for authentication of + * shuffle data requests. + * @return the serialized version of the jobToken. + */ + public static ByteBuffer serializeServiceData(Token jobToken) throws IOException { + //TODO these bytes should be versioned + DataOutputBuffer jobToken_dob = new DataOutputBuffer(); + jobToken.write(jobToken_dob); + return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()); + } + + static Token deserializeServiceData(ByteBuffer secret) throws IOException { + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(secret); + Token jt = new Token(); + jt.readFields(in); + return jt; + } + @Override public void initApp(String user, ApplicationId appId, ByteBuffer secret) { // TODO these bytes should be versioned try { - DataInputByteBuffer in = new DataInputByteBuffer(); - in.reset(secret); - Token jt = new Token(); - jt.readFields(in); - // TODO: Once SHuffle is out of NM, this can use MR APIs + Token jt = deserializeServiceData(secret); + // TODO: Once SHuffle is out of NM, this can use MR APIs JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); userRsrc.put(jobId.toString(), user); LOG.info("Added token for " + jobId.toString()); @@ -193,7 +238,7 @@ public class ShuffleHandler extends Abst Configuration conf = getConfig(); ServerBootstrap bootstrap = new ServerBootstrap(selector); bootstrap.setPipelineFactory(new HttpPipelineFactory(conf)); - port = conf.getInt("mapreduce.shuffle.port", 8080); + port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); accepted.add(bootstrap.bind(new InetSocketAddress(port))); LOG.info(getName() + " listening on port " + port); super.start(); @@ -207,6 +252,17 @@ public class ShuffleHandler extends Abst super.stop(); } + @Override + public synchronized ByteBuffer getMeta() { + try { + return serializeMetaData(port); + } catch (IOException e) { + LOG.error("Error during getMeta", e); + // TODO add API to AuxiliaryServices to report failures + return null; + } + } + Shuffle createShuffle() { return new Shuffle(getConfig()); } @@ -306,7 +362,7 @@ public class ShuffleHandler extends Abst HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); try { verifyRequest(jobId, ctx, request, response, - new URL("http", "", 8080, reqUri)); + new URL("http", "", port, reqUri)); } catch (IOException e) { LOG.warn("Shuffle failure ", e); sendError(ctx, e.getMessage(), UNAUTHORIZED); Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Thu Sep 8 01:39:07 2011 @@ -26,11 +26,21 @@ import static org.apache.hadoop.test.Met import org.jboss.netty.channel.ChannelFuture; import org.junit.Test; +import static org.junit.Assert.*; import static org.apache.hadoop.test.MockitoMaker.*; public class TestShuffleHandler { static final long MiB = 1024 * 1024; + @Test public void testSerializeMeta() throws Exception { + assertEquals(1, ShuffleHandler.deserializeMetaData( + ShuffleHandler.serializeMetaData(1))); + assertEquals(-1, ShuffleHandler.deserializeMetaData( + ShuffleHandler.serializeMetaData(-1))); + assertEquals(8080, ShuffleHandler.deserializeMetaData( + ShuffleHandler.serializeMetaData(8080))); + } + @Test public void testShuffleMetrics() throws Exception { MetricsSystem ms = new MetricsSystemImpl(); ShuffleHandler sh = new ShuffleHandler(ms); Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/bin/yarn URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/bin/yarn?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/bin/yarn (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/bin/yarn Thu Sep 8 01:39:07 2011 @@ -148,132 +148,18 @@ IFS= # add hadoop-common libs to CLASSPATH -if [ -d "$HADOOP_COMMON_HOME/build/classes" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/build/classes -fi -if [ -d "$HADOOP_COMMON_HOME/build/webapps" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/build -fi -if [ -d "$HADOOP_COMMON_HOME/build/test/classes" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/build/test/classes -fi -if [ -d "$HADOOP_COMMON_HOME/build/test/core/classes" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/build/test/core/classes -fi - -for f in $HADOOP_COMMON_HOME/hadoop-*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -for f in $HADOOP_COMMON_HOME/lib/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -for f in $HADOOP_COMMON_HOME/share/hadoop/common/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -for f in $HADOOP_COMMON_HOME/share/hadoop/common/lib/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -for f in $HADOOP_COMMON_HOME/share/hadoop/hdfs/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -if [ -d "$HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Common/common" ]; then -for f in $HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Common/common/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done -fi - -if [ -d "$HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Hdfs/common" ]; then -for f in $HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Hdfs/common/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done -fi - -if [ -d "$HADOOP_COMMON_HOME/build/ivy/lib/Hadoop/common" ]; then -for f in $HADOOP_COMMON_HOME/build/ivy/lib/Hadoop/common/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done -fi +CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/share/hadoop/common'/*' +CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/share/hadoop/common/lib'/*' # add hadoop-hdfs libs to CLASSPATH -for f in $HADOOP_HDFS_HOME/hadoop-*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -for f in $HADOOP_HDFS_HOME/lib/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -if [ -d "$HADOOP_HDFS_HOME/build/classes" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/build/classes -fi -if [ -d "$HADOOP_HDFS_HOME/build/webapps" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/build -fi -if [ -d "$HADOOP_HDFS_HOME/build/test/classes" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/build/test/classes -fi -if [ -d "$HADOOP_HDFS_HOME/build/tools" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/build/tools -fi - -# add hadoop-mapred libs to CLASSPATH - -for f in $HADOOP_HDFS_HOME/hadoop-*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -for f in $HADOOP_HDFS_HOME/lib/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -if [ -d "$HADOOP_MAPRED_HOME/build/classes" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/classes -fi -if [ -d "$HADOOP_MAPRED_HOME/build/webapps" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build -fi -if [ -d "$HADOOP_MAPRED_HOME/build/test/classes" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/test/classes -fi -if [ -d "$HADOOP_MAPRED_HOME/build/tools" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/tools -fi - -# for releases, add core mapred jar & webapps to CLASSPATH -if [ -d "$HADOOP_MAPRED_HOME/webapps" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME -fi - -# add libs to CLASSPATH -for f in $HADOOP_MAPRED_HOME/lib/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -# add libs to CLASSPATH -for f in $HADOOP_MAPRED_HOME/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -# add libs to CLASSPATH -for f in $YARN_HOME/lib/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done +CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs'/*' +CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib'/*' # add yarn libs to CLASSPATH -for f in $YARN_HOME/modules/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -# add user-specified CLASSPATH last -if [ "$YARN_USER_CLASSPATH_FIRST" = "" ] && [ "$YARN_CLASSPATH" != "" ]; then - CLASSPATH=${CLASSPATH}:${YARN_CLASSPATH} -fi + +CLASSPATH=${CLASSPATH}:$YARN_HOME/modules'/*' +CLASSPATH=${CLASSPATH}:$YARN_HOME/lib'/*' # default log directory & file if [ "$YARN_LOG_DIR" = "" ]; then Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java Thu Sep 8 01:39:07 2011 @@ -18,16 +18,94 @@ package org.apache.hadoop.yarn.api; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +/** + *

    The protocol between a live instance of ApplicationMaster + * and the ResourceManager.

    + * + *

    This is used by the ApplicationMaster to register/unregister + * and to request and obtain resources in the cluster from the + * ResourceManager.

    + */ +@Public +@Stable public interface AMRMProtocol { - public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException; - public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException;; - public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException; + + /** + *

    The interface used by a new ApplicationMaster to register + * with the ResourceManager.

    + * + *

    The ApplicationMaster needs to provide details such + * as RPC Port, HTTP tracking url etc. as specified in + * {@link RegisterApplicationMasterRequest}.

    + * + *

    The ResourceManager responds with critical details such + * as minimum and maximum resource capabilities in the cluster as specified in + * {@link RegisterApplicationMasterResponse}.

    + * + * @param request registration request + * @return registration respose + * @throws YarnRemoteException + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnRemoteException; + + /** + *

    The interface used by an ApplicationMaster to notify the + * ResourceManager about its completion (success or failed).

    + * + *

    The ApplicationMaster has to provide details such as + * final state, diagnostics (in case of failures) etc. as specified in + * {@link FinishApplicationMasterRequest}.

    + * + *

    The ResourceManager responds with + * {@link FinishApplicationMasterResponse}.

    + * + * @param request completion request + * @return completion response + * @throws YarnRemoteException + */ + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnRemoteException; + + /** + *

    The main interface between an ApplicationMaster + * and the ResourceManager.

    + * + *

    The ApplicationMaster uses this interface to provide a list + * of {@link ResourceRequest} and returns unused {@link Container} allocated + * to it via {@link AllocateRequest}.

    + * + *

    This also doubles up as a heartbeat to let the + * ResourceManager know that the ApplicationMaster + * is alive. Thus, applications should use periodically make this call to + * be kept alive.

    + * + *

    The ResourceManager responds with list of allocated + * {@link Container}, status of completed containers and headroom information + * for the application.

    + * + *

    The ApplicationMaster can use the available headroom + * (resources) to decide how to utilized allocated resources and make + * informed decisions about future resource requests.

    + * + * @param request allocation request + * @return allocation response + * @throws YarnRemoteException + */ + public AllocateResponse allocate(AllocateRequest request) + throws YarnRemoteException; } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java Thu Sep 8 01:39:07 2011 @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.api; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; + import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; @@ -36,16 +39,190 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +/** + *

    The protocol between clients and the ResourceManager + * to submit/abort jobs and to get information on applications, cluster metrics, + * nodes, queues and ACLs.

    + */ +@Public +@Stable public interface ClientRMProtocol { - public GetNewApplicationIdResponse getNewApplicationId(GetNewApplicationIdRequest request) throws YarnRemoteException; - public GetApplicationReportResponse getApplicationReport(GetApplicationReportRequest request) throws YarnRemoteException; - public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnRemoteException; - public FinishApplicationResponse finishApplication(FinishApplicationRequest request) throws YarnRemoteException; - public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request) throws YarnRemoteException; - public GetAllApplicationsResponse getAllApplications(GetAllApplicationsRequest request) throws YarnRemoteException; - public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) throws YarnRemoteException; - public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) throws YarnRemoteException; - public GetQueueUserAclsInfoResponse getQueueUserAcls(GetQueueUserAclsInfoRequest request) throws YarnRemoteException; + /** + *

    The interface used by clients to obtain a new {@link ApplicationId} for + * submitting new applications.

    + * + *

    The ResourceManager responds with a new, monotonically + * increasing, {@link ApplicationId} which is used by the client to submit + * a new application.

    + * + * @param request request to get a new ApplicationId + * @return new ApplicationId to be used to submit an application + * @throws YarnRemoteException + * @see #submitApplication(SubmitApplicationRequest) + */ + public GetNewApplicationIdResponse getNewApplicationId( + GetNewApplicationIdRequest request) + throws YarnRemoteException; + + /** + *

    The interface used by clients to submit a new application to the + * ResourceManager.

    + * + *

    The client is required to provide details such as queue, + * {@link Resource} required to run the ApplicationMaster, + * the equivalent of {@link ContainerLaunchContext} for launching + * the ApplicationMaster etc. via the + * {@link SubmitApplicationRequest}.

    + * + *

    Currently the ResourceManager sends an immediate (empty) + * {@link SubmitApplicationResponse} on accepting the submission and throws + * an exception if it rejects the submission.

    + * + *

    In secure mode,the ResourceManager verifies access to + * queues etc. before accepting the application submission.

    + * + * @param request request to submit a new application + * @return (empty) response on accepting the submission + * @throws YarnRemoteException + * @see #getNewApplicationId(GetNewApplicationIdRequest) + */ + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) + throws YarnRemoteException; + + /** + *

    The interface used by clients to request the + * ResourceManager to abort submitted application.

    + * + *

    The client, via {@link FinishApplicationRequest} provides the + * {@link ApplicationId} of the application to be aborted.

    + * + *

    In secure mode,the ResourceManager verifies access to the + * application, queue etc. before terminating the application.

    + * + *

    Currently, the ResourceManager returns an empty response + * on success and throws an exception on rejecting the request.

    + * + * @param request request to abort a submited application + * @return ResourceManager returns an empty response + * on success and throws an exception on rejecting the request + * @throws YarnRemoteException + * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest) + */ + public FinishApplicationResponse finishApplication( + FinishApplicationRequest request) + throws YarnRemoteException; + + /** + *

    The interface used by clients to get a report of an Application from + * the ResourceManager.

    + * + *

    The client, via {@link GetApplicationReportRequest} provides the + * {@link ApplicationId} of the application.

    + * + *

    In secure mode,the ResourceManager verifies access to the + * application, queue etc. before accepting the request.

    + * + *

    The ResourceManager responds with a + * {@link GetApplicationReportResponse} which includes the + * {@link ApplicationReport} for the application.

    + * + * @param request request for an application report + * @return application report + * @throws YarnRemoteException + */ + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) + throws YarnRemoteException; + + /** + *

    The interface used by clients to get metrics about the cluster from + * the ResourceManager.

    + * + *

    The ResourceManager responds with a + * {@link GetClusterMetricsResponse} which includes the + * {@link YarnClusterMetrics} with details such as number of current + * nodes in the cluster.

    + * + * @param request request for cluster metrics + * @return cluster metrics + * @throws YarnRemoteException + */ + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) + throws YarnRemoteException; + + /** + *

    The interface used by clients to get a report of all Applications + * in the cluster from the ResourceManager.

    + * + *

    The ResourceManager responds with a + * {@link GetAllApplicationsResponse} which includes the + * {@link ApplicationReport} for all the applications.

    + * + * @param request request for report on all running applications + * @return report on all running applications + * @throws YarnRemoteException + */ + public GetAllApplicationsResponse getAllApplications( + GetAllApplicationsRequest request) + throws YarnRemoteException; + + /** + *

    The interface used by clients to get a report of all nodes + * in the cluster from the ResourceManager.

    + * + *

    The ResourceManager responds with a + * {@link GetClusterNodesResponse} which includes the + * {@link NodeReport} for all the nodes in the cluster.

    + * + * @param request request for report on all nodes + * @return report on all nodes + * @throws YarnRemoteException + */ + public GetClusterNodesResponse getClusterNodes( + GetClusterNodesRequest request) + throws YarnRemoteException; + + /** + *

    The interface used by clients to get information about queues + * from the ResourceManager.

    + * + *

    The client, via {@link GetQueueInfoRequest}, can ask for details such + * as used/total resources, child queues, running applications etc.

    + * + *

    In secure mode,the ResourceManager verifies access before + * providing the information.

    + * + * @param request request to get queue information + * @return queue information + * @throws YarnRemoteException + */ + public GetQueueInfoResponse getQueueInfo( + GetQueueInfoRequest request) + throws YarnRemoteException; + + /** + *

    The interface used by clients to get information about queue + * acls for current users from the ResourceManager. + *

    + * + *

    The ResourceManager responds with queue acls for all + * existing queues.

    + * + * @param request request to get queue acls for current user + * @return queue acls for current user + * @throws YarnRemoteException + */ + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) + throws YarnRemoteException; } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManager.java?rev=1166495&r1=1166494&r2=1166495&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManager.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManager.java Thu Sep 8 01:39:07 2011 @@ -18,21 +18,108 @@ package org.apache.hadoop.yarn.api; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +/** + *

    The protocol between an ApplicationMaster and a + * NodeManager to start/stop containers and to get status + * of running containers.

    + * + *

    If security is enabled the NodeManager verifies that the + * ApplicationMaster has truly been allocated the container + * by the ResourceManager and also verifies all interactions such + * as stopping the container or obtaining status information for the container. + *

    + */ +@Public +@Stable public interface ContainerManager { + /** + *

    The ApplicationMaster requests a NodeManager + * to start a {@link Container} allocated to it using this interface. + *

    + * + *

    The ApplicationMaster has to provide details such as + * allocated resource capability, security tokens (if enabled), command + * to be executed to start the container, environment for the process, + * necessary binaries/jar/shared-objects etc. via the + * {@link ContainerLaunchContext} in the {@link StartContainerRequest}.

    + * + *

    Currently the NodeManager sends an immediate, empty + * response via {@link StartContainerResponse} to signify acceptance of the + * request and throws an exception in case of errors. The + * ApplicationMaster can use + * {@link #getContainerStatus(GetContainerStatusRequest)} to get updated + * status of the to-be-launched or launched container.

    + * + * @param request request to start a container + * @return empty response to indicate acceptance of the request + * or an exception + * @throws YarnRemoteException + */ + @Public + @Stable StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException; + /** + *

    The ApplicationMaster requests a NodeManager + * to stop a {@link Container} allocated to it using this interface. + *

    + * + *

    The ApplicationMaster

    sends a + * {@link StopContainerRequest} which includes the {@link ContainerId} of the + * container to be stopped.

    + * + *

    Currently the NodeManager sends an immediate, empty + * response via {@link StopContainerResponse} to signify acceptance of the + * request and throws an exception in case of errors. The + * ApplicationMaster can use + * {@link #getContainerStatus(GetContainerStatusRequest)} to get updated + * status of the container.

    + * + * @param request request to stop a container + * @return empty response to indicate acceptance of the request + * or an exception + * @throws YarnRemoteException + */ + @Public + @Stable StopContainerResponse stopContainer(StopContainerRequest request) throws YarnRemoteException; + /** + *

    The api used by the ApplicationMaster to request for + * current status of a Container from the + * NodeManager.

    + * + *

    The ApplicationMaster

    sends a + * {@link GetContainerStatusRequest} which includes the {@link ContainerId} of + * the container whose status is needed.

    + * + *

    The NodeManager responds with + *{@link GetContainerStatusResponse} which includes the + *{@link ContainerStatus} of the container.

    + * + * @param request request to get ContainerStatus of a container + * with the specified ContainerId + * @return ContainerStatus of the container + * @throws YarnRemoteException + */ + @Public + @Stable GetContainerStatusResponse getContainerStatus( GetContainerStatusRequest request) throws YarnRemoteException; }