Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A979572FA for ; Wed, 28 Sep 2011 07:31:30 +0000 (UTC) Received: (qmail 16288 invoked by uid 500); 28 Sep 2011 07:31:30 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 16240 invoked by uid 500); 28 Sep 2011 07:31:29 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 16223 invoked by uid 99); 28 Sep 2011 07:31:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Sep 2011 07:31:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Sep 2011 07:31:25 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BDED5238890A; Wed, 28 Sep 2011 07:31:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1176762 [1/2] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduc... Date: Wed, 28 Sep 2011 07:31:04 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20110928073105.BDED5238890A@eris.apache.org> Author: acmurthy Date: Wed Sep 28 07:31:03 2011 New Revision: 1176762 URL: http://svn.apache.org/viewvc?rev=1176762&view=rev Log: MAPREDUCE-3078. Ensure MapReduce AM reports progress correctly for displaying on the RM Web-UI. Contributed by Vinod K V. Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Sep 28 07:31:03 2011 @@ -1469,6 +1469,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3110. Fixed TestRPC failure. (vinodkv) + MAPREDUCE-3078. Ensure MapReduce AM reports progress correctly for + displaying on the RM Web-UI. (vinodkv via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml Wed Sep 28 07:31:03 2011 @@ -57,6 +57,12 @@ org.apache.hadoop + hadoop-yarn-server-resourcemanager + test-jar + test + + + org.apache.hadoop hadoop-mapreduce-client-shuffle Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Sep 28 07:31:03 2011 @@ -549,9 +549,9 @@ public class MRAppMaster extends Composi // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); - /** create a job event for job intialization */ + // create a job event for job intialization JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); - /** send init to the job (this does NOT trigger job execution) */ + // Send init to the job (this does NOT trigger job execution) // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Wed Sep 28 07:31:03 2011 @@ -92,6 +92,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -584,25 +585,17 @@ public class JobImpl implements org.apac public JobReport getReport() { readLock.lock(); try { - JobReport report = recordFactory.newRecordInstance(JobReport.class); - report.setJobId(jobId); - report.setJobState(getState()); - - // TODO - Fix to correctly setup report and to check state - if (report.getJobState() == JobState.NEW) { - return report; + JobState state = getState(); + + if (getState() == JobState.NEW) { + return MRBuilderUtils.newJobReport(jobId, jobName, username, state, + startTime, finishTime, setupProgress, 0.0f, + 0.0f, cleanupProgress); } - - report.setStartTime(startTime); - report.setFinishTime(finishTime); - report.setSetupProgress(setupProgress); - report.setCleanupProgress(cleanupProgress); - report.setMapProgress(computeProgress(mapTasks)); - report.setReduceProgress(computeProgress(reduceTasks)); - report.setJobName(jobName); - report.setUser(username); - return report; + return MRBuilderUtils.newJobReport(jobId, jobName, username, state, + startTime, finishTime, setupProgress, computeProgress(mapTasks), + computeProgress(reduceTasks), cleanupProgress); } finally { readLock.unlock(); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Wed Sep 28 07:31:03 2011 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app.local; +import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -30,15 +31,19 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; /** @@ -66,6 +71,20 @@ public class LocalContainerAllocator ext } @Override + protected synchronized void heartbeat() throws Exception { + AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( + this.applicationAttemptId, this.lastResponseID, super + .getApplicationProgress(), new ArrayList(), + new ArrayList()); + AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); + AMResponse response = allocateResponse.getAMResponse(); + if (response.getReboot()) { + // TODO + LOG.info("Event from RM: shutting down Application Master"); + } + } + + @Override public void handle(ContainerAllocatorEvent event) { if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { LOG.info("Processing the event " + event.toString()); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Wed Sep 28 07:31:03 2011 @@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2.a import java.io.IOException; import java.security.PrivilegedAction; -import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +28,7 @@ import org.apache.hadoop.mapreduce.JobID import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; @@ -42,17 +42,12 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -64,7 +59,7 @@ import org.apache.hadoop.yarn.service.Ab /** * Registers/unregisters to RM and sends heartbeats to RM. */ -public class RMCommunicator extends AbstractService { +public abstract class RMCommunicator extends AbstractService { private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); private int rmPollInterval;//millis protected ApplicationId applicationId; @@ -74,7 +69,7 @@ public class RMCommunicator extends Abst protected EventHandler eventHandler; protected AMRMProtocol scheduler; private final ClientService clientService; - private int lastResponseID; + protected int lastResponseID; private Resource minContainerCapability; private Resource maxContainerCapability; @@ -121,6 +116,34 @@ public class RMCommunicator extends Abst return job; } + /** + * Get the appProgress. Can be used only after this component is started. + * @return the appProgress. + */ + protected float getApplicationProgress() { + // For now just a single job. In future when we have a DAG, we need an + // aggregate progress. + JobReport report = this.job.getReport(); + float setupWeight = 0.05f; + float cleanupWeight = 0.05f; + float mapWeight = 0.0f; + float reduceWeight = 0.0f; + int numMaps = this.job.getTotalMaps(); + int numReduces = this.job.getTotalReduces(); + if (numMaps == 0 && numReduces == 0) { + } else if (numMaps == 0) { + reduceWeight = 0.9f; + } else if (numReduces == 0) { + mapWeight = 0.9f; + } else { + mapWeight = reduceWeight = 0.45f; + } + return (report.getSetupProgress() * setupWeight + + report.getCleanupProgress() * cleanupWeight + + report.getMapProgress() * mapWeight + report.getReduceProgress() + * reduceWeight); + } + protected void register() { //Register String host = @@ -262,18 +285,5 @@ public class RMCommunicator extends Abst }); } - protected synchronized void heartbeat() throws Exception { - AllocateRequest allocateRequest = - recordFactory.newRecordInstance(AllocateRequest.class); - allocateRequest.setApplicationAttemptId(applicationAttemptId); - allocateRequest.setResponseId(lastResponseID); - allocateRequest.addAllAsks(new ArrayList()); - allocateRequest.addAllReleases(new ArrayList()); - AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); - AMResponse response = allocateResponse.getAMResponse(); - if (response.getReboot()) { - LOG.info("Event from RM: shutting down Application Master"); - } - } - + protected abstract void heartbeat() throws Exception; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Wed Sep 28 07:31:03 2011 @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.BuilderUtils; /** * Keeps the data structures to send container requests to RM. @@ -107,15 +108,11 @@ public abstract class RMContainerRequest LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode); } - protected abstract void heartbeat() throws Exception; - protected AMResponse makeRemoteRequest() throws YarnRemoteException { - AllocateRequest allocateRequest = recordFactory - .newRecordInstance(AllocateRequest.class); - allocateRequest.setApplicationAttemptId(applicationAttemptId); - allocateRequest.setResponseId(lastResponseID); - allocateRequest.addAllAsks(new ArrayList(ask)); - allocateRequest.addAllReleases(new ArrayList(release)); + AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( + applicationAttemptId, lastResponseID, super.getApplicationProgress(), + new ArrayList(ask), new ArrayList( + release)); AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); AMResponse response = allocateResponse.getAMResponse(); lastResponseID = response.getResponseId(); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Wed Sep 28 07:31:03 2011 @@ -18,12 +18,15 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import junit.framework.Assert; @@ -32,475 +35,651 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationMaster; -import org.apache.hadoop.yarn.api.records.ApplicationStatus; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; -import org.junit.BeforeClass; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.After; import org.junit.Test; public class TestRMContainerAllocator { -// private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class); -// private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); -// -// @BeforeClass -// public static void preTests() { -// DefaultMetricsSystem.shutdown(); -// } -// -// @Test -// public void testSimple() throws Exception { -// FifoScheduler scheduler = createScheduler(); -// LocalRMContainerAllocator allocator = new LocalRMContainerAllocator( -// scheduler, new Configuration()); -// -// //add resources to scheduler -// RMNode nodeManager1 = addNode(scheduler, "h1", 10240); -// RMNode nodeManager2 = addNode(scheduler, "h2", 10240); -// RMNode nodeManager3 = addNode(scheduler, "h3", 10240); -// -// //create the container request -// ContainerRequestEvent event1 = -// createReq(1, 1024, new String[]{"h1"}); -// allocator.sendRequest(event1); -// -// //send 1 more request with different resource req -// ContainerRequestEvent event2 = createReq(2, 1024, new String[]{"h2"}); -// allocator.sendRequest(event2); -// -// //this tells the scheduler about the requests -// //as nodes are not added, no allocations -// List assigned = allocator.schedule(); -// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); -// -// //send another request with different resource and priority -// ContainerRequestEvent event3 = createReq(3, 1024, new String[]{"h3"}); -// allocator.sendRequest(event3); -// -// //this tells the scheduler about the requests -// //as nodes are not added, no allocations -// assigned = allocator.schedule(); -// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); -// -// //update resources in scheduler -// scheduler.nodeUpdate(nodeManager1); // Node heartbeat -// scheduler.nodeUpdate(nodeManager2); // Node heartbeat -// scheduler.nodeUpdate(nodeManager3); // Node heartbeat -// -// -// assigned = allocator.schedule(); -// checkAssignments( -// new ContainerRequestEvent[]{event1, event2, event3}, assigned, false); -// } -// -// //TODO: Currently Scheduler seems to have bug where it does not work -// //for Application asking for containers with different capabilities. -// //@Test -// public void testResource() throws Exception { -// FifoScheduler scheduler = createScheduler(); -// LocalRMContainerAllocator allocator = new LocalRMContainerAllocator( -// scheduler, new Configuration()); -// -// //add resources to scheduler -// RMNode nodeManager1 = addNode(scheduler, "h1", 10240); -// RMNode nodeManager2 = addNode(scheduler, "h2", 10240); -// RMNode nodeManager3 = addNode(scheduler, "h3", 10240); -// -// //create the container request -// ContainerRequestEvent event1 = -// createReq(1, 1024, new String[]{"h1"}); -// allocator.sendRequest(event1); -// -// //send 1 more request with different resource req -// ContainerRequestEvent event2 = createReq(2, 2048, new String[]{"h2"}); -// allocator.sendRequest(event2); -// -// //this tells the scheduler about the requests -// //as nodes are not added, no allocations -// List assigned = allocator.schedule(); -// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); -// -// //update resources in scheduler -// scheduler.nodeUpdate(nodeManager1); // Node heartbeat -// scheduler.nodeUpdate(nodeManager2); // Node heartbeat -// scheduler.nodeUpdate(nodeManager3); // Node heartbeat -// -// assigned = allocator.schedule(); -// checkAssignments( -// new ContainerRequestEvent[]{event1, event2}, assigned, false); -// } -// -// @Test -// public void testMapReduceScheduling() throws Exception { -// FifoScheduler scheduler = createScheduler(); -// Configuration conf = new Configuration(); -// LocalRMContainerAllocator allocator = new LocalRMContainerAllocator( -// scheduler, conf); -// -// //add resources to scheduler -// RMNode nodeManager1 = addNode(scheduler, "h1", 1024); -// RMNode nodeManager2 = addNode(scheduler, "h2", 10240); -// RMNode nodeManager3 = addNode(scheduler, "h3", 10240); -// -// //create the container request -// //send MAP request -// ContainerRequestEvent event1 = -// createReq(1, 2048, new String[]{"h1", "h2"}, true, false); -// allocator.sendRequest(event1); -// -// //send REDUCE request -// ContainerRequestEvent event2 = createReq(2, 3000, new String[]{"h1"}, false, true); -// allocator.sendRequest(event2); -// -// //send MAP request -// ContainerRequestEvent event3 = createReq(3, 2048, new String[]{"h3"}, false, false); -// allocator.sendRequest(event3); -// -// //this tells the scheduler about the requests -// //as nodes are not added, no allocations -// List assigned = allocator.schedule(); -// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); -// -// //update resources in scheduler -// scheduler.nodeUpdate(nodeManager1); // Node heartbeat -// scheduler.nodeUpdate(nodeManager2); // Node heartbeat -// scheduler.nodeUpdate(nodeManager3); // Node heartbeat -// -// assigned = allocator.schedule(); -// checkAssignments( -// new ContainerRequestEvent[]{event1, event3}, assigned, false); -// -// //validate that no container is assigned to h1 as it doesn't have 2048 -// for (TaskAttemptContainerAssignedEvent assig : assigned) { -// Assert.assertFalse("Assigned count not correct", -// "h1".equals(assig.getContainer().getNodeId().getHost())); -// } -// } -// -// -// -// private RMNode addNode(FifoScheduler scheduler, -// String nodeName, int memory) { -// NodeId nodeId = recordFactory.newRecordInstance(NodeId.class); -// nodeId.setHost(nodeName); -// nodeId.setPort(1234); -// Resource resource = recordFactory.newRecordInstance(Resource.class); -// resource.setMemory(memory); -// RMNode nodeManager = new RMNodeImpl(nodeId, null, nodeName, 0, 0, -// ResourceTrackerService.resolve(nodeName), resource); -// scheduler.addNode(nodeManager); // Node registration -// return nodeManager; -// } -// -// private FifoScheduler createScheduler() throws YarnRemoteException { -// FifoScheduler fsc = new FifoScheduler() { -// //override this to copy the objects -// //otherwise FifoScheduler updates the numContainers in same objects as kept by -// //RMContainerAllocator -// -// @Override -// public synchronized void allocate(ApplicationAttemptId applicationId, -// List ask) { -// List askCopy = new ArrayList(); -// for (ResourceRequest req : ask) { -// ResourceRequest reqCopy = recordFactory.newRecordInstance(ResourceRequest.class); -// reqCopy.setPriority(req.getPriority()); -// reqCopy.setHostName(req.getHostName()); -// reqCopy.setCapability(req.getCapability()); -// reqCopy.setNumContainers(req.getNumContainers()); -// askCopy.add(reqCopy); -// } -// super.allocate(applicationId, askCopy); -// } -// }; -// try { -// fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null); -// fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class), -// recordFactory.newRecordInstance(ApplicationMaster.class), -// "test", null, null, StoreFactory.createVoidAppStore()); -// } catch(IOException ie) { -// LOG.info("add application failed with ", ie); -// assert(false); -// } -// return fsc; -// } -// -// private ContainerRequestEvent createReq( -// int attemptid, int memory, String[] hosts) { -// return createReq(attemptid, memory, hosts, false, false); -// } -// -// private ContainerRequestEvent createReq( -// int attemptid, int memory, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { -// ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); -// appId.setClusterTimestamp(0); -// appId.setId(0); -// JobId jobId = recordFactory.newRecordInstance(JobId.class); -// jobId.setAppId(appId); -// jobId.setId(0); -// TaskId taskId = recordFactory.newRecordInstance(TaskId.class); -// taskId.setId(0); -// taskId.setJobId(jobId); -// if (reduce) { -// taskId.setTaskType(TaskType.REDUCE); -// } else { -// taskId.setTaskType(TaskType.MAP); -// } -// TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class); -// attemptId.setId(attemptid); -// attemptId.setTaskId(taskId); -// Resource containerNeed = recordFactory.newRecordInstance(Resource.class); -// containerNeed.setMemory(memory); -// if (earlierFailedAttempt) { -// return ContainerRequestEvent. -// createContainerRequestEventForFailedContainer(attemptId, containerNeed); -// } -// return new ContainerRequestEvent(attemptId, -// containerNeed, -// hosts, new String[] {NetworkTopology.DEFAULT_RACK}); -// } -// -// private void checkAssignments(ContainerRequestEvent[] requests, -// List assignments, -// boolean checkHostMatch) { -// Assert.assertNotNull("Container not assigned", assignments); -// Assert.assertEquals("Assigned count not correct", -// requests.length, assignments.size()); -// -// //check for uniqueness of containerIDs -// Set containerIds = new HashSet(); -// for (TaskAttemptContainerAssignedEvent assigned : assignments) { -// containerIds.add(assigned.getContainer().getId()); -// } -// Assert.assertEquals("Assigned containers must be different", -// assignments.size(), containerIds.size()); -// -// //check for all assignment -// for (ContainerRequestEvent req : requests) { -// TaskAttemptContainerAssignedEvent assigned = null; -// for (TaskAttemptContainerAssignedEvent ass : assignments) { -// if (ass.getTaskAttemptID().equals(req.getAttemptID())){ -// assigned = ass; -// break; -// } -// } -// checkAssignment(req, assigned, checkHostMatch); -// } -// } -// -// private void checkAssignment(ContainerRequestEvent request, -// TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) { -// Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(), -// assigned); -// Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(), -// assigned.getTaskAttemptID()); -// if (checkHostMatch) { -// Assert.assertTrue("Not assigned to requested host", Arrays.asList( -// request.getHosts()).contains( -// assigned.getContainer().getNodeId().toString())); -// } -// -// } -// -// //Mock RMContainerAllocator -// //Instead of talking to remote Scheduler,uses the local Scheduler -// public static class LocalRMContainerAllocator extends RMContainerAllocator { -// private static final List events = -// new ArrayList(); -// -// public static class AMRMProtocolImpl implements AMRMProtocol { -// -// private ResourceScheduler resourceScheduler; -// -// public AMRMProtocolImpl(ResourceScheduler resourceScheduler) { -// this.resourceScheduler = resourceScheduler; -// } -// -// @Override -// public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException { -// RegisterApplicationMasterResponse response = recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class); -// return response; -// } -// -// public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException { -// List ask = request.getAskList(); -// List release = request.getReleaseList(); -// try { -// AMResponse response = recordFactory.newRecordInstance(AMResponse.class); -// Allocation allocation = resourceScheduler.allocate(request.getApplicationAttemptId(), ask); -// response.addAllNewContainers(allocation.getContainers()); -// response.setAvailableResources(allocation.getResourceLimit()); -// AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class); -// allocateResponse.setAMResponse(response); -// return allocateResponse; -// } catch(IOException ie) { -// throw RPCUtil.getRemoteException(ie); -// } -// } -// -// @Override -// public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException { -// FinishApplicationMasterResponse response = recordFactory.newRecordInstance(FinishApplicationMasterResponse.class); -// return response; -// } -// -// } -// -// private ResourceScheduler scheduler; -// LocalRMContainerAllocator(ResourceScheduler scheduler, Configuration conf) { -// super(null, new TestContext(events)); -// this.scheduler = scheduler; -// super.init(conf); -// super.start(); -// } -// -// protected AMRMProtocol createSchedulerProxy() { -// return new AMRMProtocolImpl(scheduler); -// } -// -// @Override -// protected void register() {} -// @Override -// protected void unregister() {} -// -// @Override -// protected Resource getMinContainerCapability() { -// Resource res = recordFactory.newRecordInstance(Resource.class); -// res.setMemory(1024); -// return res; -// } -// -// @Override -// protected Resource getMaxContainerCapability() { -// Resource res = recordFactory.newRecordInstance(Resource.class); -// res.setMemory(10240); -// return res; -// } -// -// public void sendRequest(ContainerRequestEvent req) { -// sendRequests(Arrays.asList(new ContainerRequestEvent[]{req})); -// } -// -// public void sendRequests(List reqs) { -// for (ContainerRequestEvent req : reqs) { -// handle(req); -// } -// } -// -// //API to be used by tests -// public List schedule() { -// //run the scheduler -// try { -// heartbeat(); -// } catch (Exception e) { -// LOG.error("error in heartbeat ", e); -// throw new YarnException(e); -// } -// -// List result = new ArrayList(events); -// events.clear(); -// return result; -// } -// -// protected void startAllocatorThread() { -// //override to NOT start thread -// } -// -// static class TestContext implements AppContext { -// private List events; -// TestContext(List events) { -// this.events = events; -// } -// @Override -// public Map getAllJobs() { -// return null; -// } -// @Override -// public ApplicationAttemptId getApplicationAttemptId() { -// return recordFactory.newRecordInstance(ApplicationAttemptId.class); -// } -// @Override -// public ApplicationId getApplicationID() { -// return recordFactory.newRecordInstance(ApplicationId.class); -// } -// @Override -// public EventHandler getEventHandler() { -// return new EventHandler() { -// @Override -// public void handle(Event event) { -// events.add((TaskAttemptContainerAssignedEvent) event); -// } -// }; -// } -// @Override -// public Job getJob(JobId jobID) { -// return null; -// } -// -// @Override -// public String getUser() { -// return null; -// } -// -// @Override -// public Clock getClock() { -// return null; -// } -// -// @Override -// public String getApplicationName() { -// return null; -// } -// -// @Override -// public long getStartTime() { -// return 0; -// } -// } -// } -// -// public static void main(String[] args) throws Exception { -// TestRMContainerAllocator t = new TestRMContainerAllocator(); -// t.testSimple(); -// //t.testResource(); -// t.testMapReduceScheduling(); -// } + + static final Log LOG = LogFactory + .getLog(TestRMContainerAllocator.class); + static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @After + public void tearDown() { + DefaultMetricsSystem.shutdown(); + } + + @Test + public void testSimple() throws Exception { + + LOG.info("Running testSimple"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, + 0, 0, 0, 0, 0, 0)); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); + MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); + MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); + dispatcher.await(); + + // create the container request + ContainerRequestEvent event1 = createReq(jobId, 1, 1024, + new String[] { "h1" }); + allocator.sendRequest(event1); + + // send 1 more request with different resource req + ContainerRequestEvent event2 = createReq(jobId, 2, 1024, + new String[] { "h2" }); + allocator.sendRequest(event2); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // send another request with different resource and priority + ContainerRequestEvent event3 = createReq(jobId, 3, 1024, + new String[] { "h3" }); + allocator.sendRequest(event3); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // update resources in scheduler + nodeManager1.nodeHeartbeat(true); // Node heartbeat + nodeManager2.nodeHeartbeat(true); // Node heartbeat + nodeManager3.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, + assigned, false); + } + + @Test + public void testResource() throws Exception { + + LOG.info("Running testResource"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, + 0, 0, 0, 0, 0, 0)); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); + MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); + MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); + dispatcher.await(); + + // create the container request + ContainerRequestEvent event1 = createReq(jobId, 1, 1024, + new String[] { "h1" }); + allocator.sendRequest(event1); + + // send 1 more request with different resource req + ContainerRequestEvent event2 = createReq(jobId, 2, 2048, + new String[] { "h2" }); + allocator.sendRequest(event2); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // update resources in scheduler + nodeManager1.nodeHeartbeat(true); // Node heartbeat + nodeManager2.nodeHeartbeat(true); // Node heartbeat + nodeManager3.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + checkAssignments(new ContainerRequestEvent[] { event1, event2 }, + assigned, false); + } + + @Test + public void testMapReduceScheduling() throws Exception { + + LOG.info("Running testMapReduceScheduling"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, + 0, 0, 0, 0, 0, 0)); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); + MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); + MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); + dispatcher.await(); + + // create the container request + // send MAP request + ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { + "h1", "h2" }, true, false); + allocator.sendRequest(event1); + + // send REDUCE request + ContainerRequestEvent event2 = createReq(jobId, 2, 3000, + new String[] { "h1" }, false, true); + allocator.sendRequest(event2); + + // send MAP request + ContainerRequestEvent event3 = createReq(jobId, 3, 2048, + new String[] { "h3" }, false, false); + allocator.sendRequest(event3); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // update resources in scheduler + nodeManager1.nodeHeartbeat(true); // Node heartbeat + nodeManager2.nodeHeartbeat(true); // Node heartbeat + nodeManager3.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + checkAssignments(new ContainerRequestEvent[] { event1, event3 }, + assigned, false); + + // validate that no container is assigned to h1 as it doesn't have 2048 + for (TaskAttemptContainerAssignedEvent assig : assigned) { + Assert.assertFalse("Assigned count not correct", "h1".equals(assig + .getContainer().getNodeId().getHost())); + } + } + + private static class MyResourceManager extends MockRM { + + public MyResourceManager(Configuration conf) { + super(conf); + } + + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + + @Override + protected EventHandler createSchedulerEventDispatcher() { + // Dispatch inline for test sanity + return new EventHandler() { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + @Override + protected ResourceScheduler createScheduler() { + return new MyFifoScheduler(getRMContext()); + } + } + + private static class FakeJob extends JobImpl { + + public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf, + int numMaps, int numReduces) { + super(appAttemptID, conf, null, null, null, null, null, null, null, + null); + this.jobId = MRBuilderUtils + .newJobId(appAttemptID.getApplicationId(), 0); + this.numMaps = numMaps; + this.numReduces = numReduces; + } + + private float setupProgress; + private float mapProgress; + private float reduceProgress; + private float cleanupProgress; + private final int numMaps; + private final int numReduces; + private JobId jobId; + + void setProgress(float setupProgress, float mapProgress, + float reduceProgress, float cleanupProgress) { + this.setupProgress = setupProgress; + this.mapProgress = mapProgress; + this.reduceProgress = reduceProgress; + this.cleanupProgress = cleanupProgress; + } + + @Override + public int getTotalMaps() { return this.numMaps; } + @Override + public int getTotalReduces() { return this.numReduces;} + + @Override + public JobReport getReport() { + return MRBuilderUtils.newJobReport(this.jobId, "job", "user", + JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress, + this.reduceProgress, this.cleanupProgress); + } + } + + @Test + public void testReportedAppProgress() throws Exception { + + LOG.info("Running testReportedAppProgress"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + FakeJob job = new FakeJob(appAttemptId, conf, 2, 2); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, job); + + allocator.schedule(); // Send heartbeat + dispatcher.await(); + Assert.assertEquals(0.0, app.getProgress(), 0.0); + + job.setProgress(100, 10, 0, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(9.5f, app.getProgress(), 0.0); + + job.setProgress(100, 80, 0, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(41.0f, app.getProgress(), 0.0); + + job.setProgress(100, 100, 20, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(59.0f, app.getProgress(), 0.0); + + job.setProgress(100, 100, 100, 100); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(100.0f, app.getProgress(), 0.0); + } + + @Test + public void testReportedAppProgressWithOnlyMaps() throws Exception { + + LOG.info("Running testReportedAppProgressWithOnlyMaps"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + FakeJob job = new FakeJob(appAttemptId, conf, 2, 0); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, job); + + allocator.schedule(); // Send heartbeat + dispatcher.await(); + Assert.assertEquals(0.0, app.getProgress(), 0.0); + + job.setProgress(100, 10, 0, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(14f, app.getProgress(), 0.0); + + job.setProgress(100, 60, 0, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(59.0f, app.getProgress(), 0.0); + + job.setProgress(100, 100, 0, 100); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(100.0f, app.getProgress(), 0.0); + } + + private static class MyFifoScheduler extends FifoScheduler { + + public MyFifoScheduler(RMContext rmContext) { + super(); + try { + reinitialize(new Configuration(), new ContainerTokenSecretManager(), + rmContext); + } catch (IOException ie) { + LOG.info("add application failed with ", ie); + assert (false); + } + } + + // override this to copy the objects otherwise FifoScheduler updates the + // numContainers in same objects as kept by RMContainerAllocator + @Override + public synchronized Allocation allocate( + ApplicationAttemptId applicationAttemptId, List ask, + List release) { + List askCopy = new ArrayList(); + for (ResourceRequest req : ask) { + ResourceRequest reqCopy = BuilderUtils.newResourceRequest(req + .getPriority(), req.getHostName(), req.getCapability(), req + .getNumContainers()); + askCopy.add(reqCopy); + } + return super.allocate(applicationAttemptId, askCopy, release); + } + } + + private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, + int memory, String[] hosts) { + return createReq(jobId, taskAttemptId, memory, hosts, false, false); + } + + private ContainerRequestEvent + createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts, + boolean earlierFailedAttempt, boolean reduce) { + TaskId taskId; + if (reduce) { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); + } else { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + } + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, + taskAttemptId); + Resource containerNeed = BuilderUtils.newResource(memory); + if (earlierFailedAttempt) { + return ContainerRequestEvent + .createContainerRequestEventForFailedContainer(attemptId, + containerNeed); + } + return new ContainerRequestEvent(attemptId, containerNeed, hosts, + new String[] { NetworkTopology.DEFAULT_RACK }); + } + + private void checkAssignments(ContainerRequestEvent[] requests, + List assignments, + boolean checkHostMatch) { + Assert.assertNotNull("Container not assigned", assignments); + Assert.assertEquals("Assigned count not correct", requests.length, + assignments.size()); + + // check for uniqueness of containerIDs + Set containerIds = new HashSet(); + for (TaskAttemptContainerAssignedEvent assigned : assignments) { + containerIds.add(assigned.getContainer().getId()); + } + Assert.assertEquals("Assigned containers must be different", assignments + .size(), containerIds.size()); + + // check for all assignment + for (ContainerRequestEvent req : requests) { + TaskAttemptContainerAssignedEvent assigned = null; + for (TaskAttemptContainerAssignedEvent ass : assignments) { + if (ass.getTaskAttemptID().equals(req.getAttemptID())) { + assigned = ass; + break; + } + } + checkAssignment(req, assigned, checkHostMatch); + } + } + + private void checkAssignment(ContainerRequestEvent request, + TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) { + Assert.assertNotNull("Nothing assigned to attempt " + + request.getAttemptID(), assigned); + Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(), + assigned.getTaskAttemptID()); + if (checkHostMatch) { + Assert.assertTrue("Not assigned to requested host", Arrays.asList( + request.getHosts()).contains( + assigned.getContainer().getNodeId().toString())); + } + } + + // Mock RMContainerAllocator + // Instead of talking to remote Scheduler,uses the local Scheduler + private static class MyContainerAllocator extends RMContainerAllocator { + static final List events + = new ArrayList(); + + private MyResourceManager rm; + + @SuppressWarnings("rawtypes") + private static AppContext createAppContext( + ApplicationAttemptId appAttemptId, Job job) { + AppContext context = mock(AppContext.class); + ApplicationId appId = appAttemptId.getApplicationId(); + when(context.getApplicationID()).thenReturn(appId); + when(context.getApplicationAttemptId()).thenReturn(appAttemptId); + when(context.getJob(isA(JobId.class))).thenReturn(job); + when(context.getEventHandler()).thenReturn(new EventHandler() { + @Override + public void handle(Event event) { + // Only capture interesting events. + if (event instanceof TaskAttemptContainerAssignedEvent) { + events.add((TaskAttemptContainerAssignedEvent) event); + } + } + }); + return context; + } + + private static ClientService createMockClientService() { + ClientService service = mock(ClientService.class); + when(service.getBindAddress()).thenReturn( + NetUtils.createSocketAddr("localhost:4567")); + when(service.getHttpPort()).thenReturn(890); + return service; + } + + MyContainerAllocator(MyResourceManager rm, Configuration conf, + ApplicationAttemptId appAttemptId, Job job) { + super(createMockClientService(), createAppContext(appAttemptId, job)); + this.rm = rm; + super.init(conf); + super.start(); + } + + @Override + protected AMRMProtocol createSchedulerProxy() { + return this.rm.getApplicationMasterService(); + } + + @Override + protected void register() { + super.register(); + } + + @Override + protected void unregister() { + } + + @Override + protected Resource getMinContainerCapability() { + return BuilderUtils.newResource(1024); + } + + @Override + protected Resource getMaxContainerCapability() { + return BuilderUtils.newResource(10240); + } + + public void sendRequest(ContainerRequestEvent req) { + sendRequests(Arrays.asList(new ContainerRequestEvent[] { req })); + } + + public void sendRequests(List reqs) { + for (ContainerRequestEvent req : reqs) { + super.handle(req); + } + } + + // API to be used by tests + public List schedule() { + // run the scheduler + try { + super.heartbeat(); + } catch (Exception e) { + LOG.error("error in heartbeat ", e); + throw new YarnException(e); + } + + List result + = new ArrayList(events); + events.clear(); + return result; + } + + protected void startAllocatorThread() { + // override to NOT start thread + } + } + + public static void main(String[] args) throws Exception { + TestRMContainerAllocator t = new TestRMContainerAllocator(); + t.testSimple(); + t.testResource(); + t.testMapReduceScheduling(); + t.testReportedAppProgress(); + t.testReportedAppProgressWithOnlyMaps(); + } } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Wed Sep 28 07:31:03 2011 @@ -19,27 +19,25 @@ package org.apache.hadoop.mapreduce.v2.util; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.Records; public class MRBuilderUtils { - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - public static JobId newJobId(ApplicationId appId, int id) { - JobId jobId = recordFactory.newRecordInstance(JobId.class); + JobId jobId = Records.newRecord(JobId.class); jobId.setAppId(appId); jobId.setId(id); return jobId; } public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) { - TaskId taskId = recordFactory.newRecordInstance(TaskId.class); + TaskId taskId = Records.newRecord(TaskId.class); taskId.setJobId(jobId); taskId.setId(id); taskId.setTaskType(taskType); @@ -48,9 +46,27 @@ public class MRBuilderUtils { public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) { TaskAttemptId taskAttemptId = - recordFactory.newRecordInstance(TaskAttemptId.class); + Records.newRecord(TaskAttemptId.class); taskAttemptId.setTaskId(taskId); taskAttemptId.setId(attemptId); return taskAttemptId; } + + public static JobReport newJobReport(JobId jobId, String jobName, + String userName, JobState state, long startTime, long finishTime, + float setupProgress, float mapProgress, float reduceProgress, + float cleanupProgress) { + JobReport report = Records.newRecord(JobReport.class); + report.setJobId(jobId); + report.setJobName(jobName); + report.setUser(userName); + report.setJobState(state); + report.setStartTime(startTime); + report.setFinishTime(finishTime); + report.setSetupProgress(setupProgress); + report.setCleanupProgress(cleanupProgress); + report.setMapProgress(mapProgress); + report.setReduceProgress(reduceProgress); + return report; + } } \ No newline at end of file Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Wed Sep 28 07:31:03 2011 @@ -66,6 +66,12 @@ org.apache.hadoop + hadoop-yarn-server-resourcemanager + test-jar + test + + + org.apache.hadoop hadoop-yarn-server-common test Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml Wed Sep 28 07:31:03 2011 @@ -90,6 +90,12 @@ org.apache.hadoop + hadoop-yarn-server-resourcemanager + ${yarn.version} + test-jar + + + org.apache.hadoop hadoop-mapreduce-client-core ${hadoop-mapreduce.version} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Wed Sep 28 07:31:03 2011 @@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.util; import java.net.URI; import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -184,6 +186,13 @@ public class BuilderUtils { return id; } + public static NodeId newNodeId(String host, int port) { + NodeId nodeId = recordFactory.newRecordInstance(NodeId.class); + nodeId.setHost(host); + nodeId.setPort(port); + return nodeId; + } + public static Container newContainer(RecordFactory recordFactory, ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority) { @@ -266,5 +275,18 @@ public class BuilderUtils { url.setFile(file); return url; } - + + public static AllocateRequest newAllocateRequest( + ApplicationAttemptId applicationAttemptId, int responseID, + float appProgress, List resourceAsk, + List containersToBeReleased) { + AllocateRequest allocateRequest = recordFactory + .newRecordInstance(AllocateRequest.class); + allocateRequest.setApplicationAttemptId(applicationAttemptId); + allocateRequest.setResponseId(responseID); + allocateRequest.setProgress(appProgress); + allocateRequest.addAllAsks(resourceAsk); + allocateRequest.addAllReleases(containersToBeReleased); + return allocateRequest; + } } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Wed Sep 28 07:31:03 2011 @@ -37,6 +37,20 @@ + + + + maven-jar-plugin + + + + test-jar + + test-compile + + + + maven-antrun-plugin Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Sep 28 07:31:03 2011 @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -250,13 +251,10 @@ public class RMAppManager implements Eve if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { - LOG.info("Application with id " + applicationId + - " is already present! Cannot add a duplicate!"); - // don't send event through dispatcher as it will be handled by app - // already present with this id. - application.handle(new RMAppRejectedEvent(applicationId, - "Application with this id is already present! " + - "Cannot add a duplicate!")); + String message = "Application with id " + applicationId + + " is already present! Cannot add a duplicate!"; + LOG.info(message); + throw RPCUtil.getRemoteException(message); } else { this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.START)); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Sep 28 07:31:03 2011 @@ -98,7 +98,7 @@ public class ResourceManager extends Com private ContainerAllocationExpirer containerAllocationExpirer; protected NMLivelinessMonitor nmLivelinessMonitor; protected NodesListManager nodesListManager; - private SchedulerEventDispatcher schedulerDispatcher; + private EventHandler schedulerDispatcher; protected RMAppManager rmAppManager; private WebApp webApp; @@ -119,7 +119,7 @@ public class ResourceManager extends Com @Override public synchronized void init(Configuration conf) { - this.rmDispatcher = new AsyncDispatcher(); + this.rmDispatcher = createDispatcher(); addIfService(this.rmDispatcher); this.containerAllocationExpirer = new ContainerAllocationExpirer( @@ -138,8 +138,8 @@ public class ResourceManager extends Com this.conf = new YarnConfiguration(conf); // Initialize the scheduler this.scheduler = createScheduler(); - this.schedulerDispatcher = new SchedulerEventDispatcher(this.scheduler); - addService(this.schedulerDispatcher); + this.schedulerDispatcher = createSchedulerEventDispatcher(); + addIfService(this.schedulerDispatcher); this.rmDispatcher.register(SchedulerEventType.class, this.schedulerDispatcher); @@ -195,6 +195,14 @@ public class ResourceManager extends Com super.init(conf); } + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler); + } + + protected Dispatcher createDispatcher() { + return new AsyncDispatcher(); + } + protected void addIfService(Object object) { if (object instanceof Service) { addService((Service) object); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Wed Sep 28 07:31:03 2011 @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.res * look at {@link RMAppImpl} for its implementation. This interface * exposes methods to access various updates in application status/report. */ -public interface RMApp extends EventHandler{ +public interface RMApp extends EventHandler { /** * The application id for this {@link RMApp}. Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Wed Sep 28 07:31:03 2011 @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.server.res * {@link YarnConfiguration#RM_AM_MAX_RETRIES}. For specific * implementation take a look at {@link RMAppAttemptImpl}. */ -public interface RMAppAttempt extends EventHandler{ +public interface RMAppAttempt extends EventHandler { /** * Get the application attempt id for this {@link RMAppAttempt}. Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Sep 28 07:31:03 2011 @@ -685,6 +685,8 @@ public class RMAppAttemptImpl implements public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + appAttempt.progress = 1.0f; + // Tell the app and the scheduler super.transition(appAttempt, event); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java Wed Sep 28 07:31:03 2011 @@ -207,13 +207,18 @@ public class SchedulerApp { .getDispatcher().getEventHandler(), this.rmContext .getContainerAllocationExpirer()); + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(container.getId(), rmContainer); + // Update consumption and track allocations - + appSchedulingInfo.allocate(type, node, priority, request, container); + Resources.addTo(currentConsumption, container.getResource()); + // Inform the container rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.START)); - Resources.addTo(currentConsumption, container.getResource()); if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationAttemptId=" + container.getId().getApplicationAttemptId() @@ -223,12 +228,6 @@ public class SchedulerApp { RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, "SchedulerApp", getApplicationId(), container.getId()); - - // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); - liveContainers.put(container.getId(), rmContainer); - - appSchedulingInfo.allocate(type, node, priority, request, container); return rmContainer; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java Wed Sep 28 07:31:03 2011 @@ -19,10 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; import java.util.List; -import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1176762&r1=1176761&r2=1176762&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Sep 28 07:31:03 2011 @@ -291,7 +291,7 @@ public class FifoScheduler implements Re @SuppressWarnings("unchecked") private synchronized void addApplication(ApplicationAttemptId appAttemptId, - String queueName, String user) { + String user) { // TODO: Fix store SchedulerApp schedulerApp = new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, @@ -628,7 +628,7 @@ public class FifoScheduler implements Re { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent - .getQueue(), appAddedEvent.getUser()); + .getUser()); } break; case APP_REMOVED: