Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D0D8D9047 for ; Wed, 11 Apr 2012 17:10:22 +0000 (UTC) Received: (qmail 17126 invoked by uid 500); 11 Apr 2012 17:10:22 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 17053 invoked by uid 500); 11 Apr 2012 17:10:21 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 17035 invoked by uid 99); 11 Apr 2012 17:10:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Apr 2012 17:10:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Apr 2012 17:10:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4976F238899C; Wed, 11 Apr 2012 17:09:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1324866 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apac... Date: Wed, 11 Apr 2012 17:09:54 -0000 To: mapreduce-commits@hadoop.apache.org From: sseth@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120411170955.4976F238899C@eris.apache.org> Author: sseth Date: Wed Apr 11 17:09:54 2012 New Revision: 1324866 URL: http://svn.apache.org/viewvc?rev=1324866&view=rev Log: MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory after the history service is stopped. (Contributed by Jason Lowe) Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1324866&r1=1324865&r2=1324866&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Apr 11 17:09:54 2012 @@ -310,6 +310,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4040. History links should use hostname rather than IP address. (Bhallamudi Venkata Siva Kamesh via sseth) + MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory + after the history service is stopped. (Jason Lowe via sseth) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1324866&r1=1324865&r2=1324866&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Apr 11 17:09:54 2012 @@ -285,6 +285,11 @@ public class MRAppMaster extends Composi addIfService(containerLauncher); dispatcher.register(ContainerLauncher.EventType.class, containerLauncher); + // Add the staging directory cleaner before the history server but after + // the container allocator so the staging directory is cleaned after + // the history has been flushed but before unregistering with the RM. + addService(createStagingDirCleaningService()); + // Add the JobHistoryEventHandler last so that it is properly stopped first. // This will guarantee that all history-events are flushed before AM goes // ahead with shutdown. @@ -406,13 +411,6 @@ public class MRAppMaster extends Composi e.printStackTrace(); } - // Cleanup staging directory - try { - cleanupStagingDir(); - } catch(IOException io) { - LOG.warn("Failed to delete staging dir", io); - } - try { // Stop all services // This will also send the final report to the ResourceManager @@ -512,6 +510,10 @@ public class MRAppMaster extends Composi return this.jobHistoryEventHandler; } + protected AbstractService createStagingDirCleaningService() { + return new StagingDirCleaningService(); + } + protected Speculator createSpeculator(Configuration conf, AppContext context) { Class speculatorClass; @@ -710,6 +712,22 @@ public class MRAppMaster extends Composi } } + private final class StagingDirCleaningService extends AbstractService { + StagingDirCleaningService() { + super(StagingDirCleaningService.class.getName()); + } + + @Override + public synchronized void stop() { + try { + cleanupStagingDir(); + } catch (IOException io) { + LOG.error("Failed to cleanup staging dir: ", io); + } + super.stop(); + } + } + private class RunningAppContext implements AppContext { private final Map jobs = new ConcurrentHashMap(); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1324866&r1=1324865&r2=1324866&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Wed Apr 11 17:09:54 2012 @@ -428,9 +428,13 @@ public class MRApp extends MRAppMaster { @Override protected ContainerAllocator createContainerAllocator( ClientService clientService, final AppContext context) { - return new ContainerAllocator(){ - private int containerCount; - @Override + return new MRAppContainerAllocator(); + } + + protected class MRAppContainerAllocator implements ContainerAllocator { + private int containerCount; + + @Override public void handle(ContainerAllocatorEvent event) { ContainerId cId = recordFactory.newRecordInstance(ContainerId.class); cId.setApplicationAttemptId(getContext().getApplicationAttemptId()); @@ -452,7 +456,6 @@ public class MRApp extends MRAppMaster { new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); } - }; } @Override Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1324866&r1=1324865&r2=1324866&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Wed Apr 11 17:09:54 2012 @@ -18,11 +18,10 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.spy; -import java.io.IOException; import java.util.Iterator; import junit.framework.Assert; @@ -36,14 +35,11 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; -import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.YarnException; import org.junit.Test; /** @@ -237,71 +233,6 @@ public class TestMRApp { } } - private final class MRAppTestCleanup extends MRApp { - boolean hasStopped; - boolean cleanedBeforeStopped; - - public MRAppTestCleanup(int maps, int reduces, boolean autoComplete, - String testName, boolean cleanOnStart) { - super(maps, reduces, autoComplete, testName, cleanOnStart); - hasStopped = false; - cleanedBeforeStopped = false; - } - - @Override - protected Job createJob(Configuration conf) { - UserGroupInformation currentUser = null; - try { - currentUser = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new YarnException(e); - } - Job newJob = new TestJob(getJobId(), getAttemptID(), conf, - getDispatcher().getEventHandler(), - getTaskAttemptListener(), getContext().getClock(), - getCommitter(), isNewApiCommitter(), - currentUser.getUserName(), getContext()); - ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); - - getDispatcher().register(JobFinishEvent.Type.class, - createJobFinishEventHandler()); - - return newJob; - } - - @Override - public void cleanupStagingDir() throws IOException { - cleanedBeforeStopped = !hasStopped; - } - - @Override - public synchronized void stop() { - hasStopped = true; - super.stop(); - } - - @Override - protected void sysexit() { - } - } - - @Test - public void testStagingCleanupOrder() throws Exception { - MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, - this.getClass().getName(), true); - JobImpl job = (JobImpl)app.submit(new Configuration()); - app.waitForState(job, JobState.SUCCEEDED); - app.verifyCompleted(); - - int waitTime = 20 * 1000; - while (waitTime > 0 && !app.cleanedBeforeStopped) { - Thread.sleep(100); - waitTime -= 100; - } - Assert.assertTrue("Staging directory not cleaned before notifying RM", - app.cleanedBeforeStopped); - } - public static void main(String[] args) throws Exception { TestMRApp t = new TestMRApp(); t.testMapReduce(); @@ -310,6 +241,5 @@ public class TestMRApp { t.testCompletedMapsForReduceSlowstart(); t.testJobError(); t.testCountersOnJobFinish(); - t.testStagingCleanupOrder(); } } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1324866&r1=1324865&r2=1324866&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Wed Apr 11 17:09:54 2012 @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import junit.framework.Assert; import junit.framework.TestCase; import org.apache.commons.logging.Log; @@ -35,12 +36,21 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; @@ -103,4 +113,89 @@ import org.junit.Test; } } + private final class MRAppTestCleanup extends MRApp { + boolean stoppedContainerAllocator; + boolean cleanedBeforeContainerAllocatorStopped; + + public MRAppTestCleanup(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart) { + super(maps, reduces, autoComplete, testName, cleanOnStart); + stoppedContainerAllocator = false; + cleanedBeforeContainerAllocatorStopped = false; + } + + @Override + protected Job createJob(Configuration conf) { + UserGroupInformation currentUser = null; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new YarnException(e); + } + Job newJob = new TestJob(getJobId(), getAttemptID(), conf, + getDispatcher().getEventHandler(), + getTaskAttemptListener(), getContext().getClock(), + getCommitter(), isNewApiCommitter(), + currentUser.getUserName(), getContext()); + ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); + + getDispatcher().register(JobFinishEvent.Type.class, + createJobFinishEventHandler()); + + return newJob; + } + + @Override + protected ContainerAllocator createContainerAllocator( + ClientService clientService, AppContext context) { + return new TestCleanupContainerAllocator(); + } + + private class TestCleanupContainerAllocator extends AbstractService + implements ContainerAllocator { + private MRAppContainerAllocator allocator; + + TestCleanupContainerAllocator() { + super(TestCleanupContainerAllocator.class.getName()); + allocator = new MRAppContainerAllocator(); + } + + @Override + public void handle(ContainerAllocatorEvent event) { + allocator.handle(event); + } + + @Override + public synchronized void stop() { + stoppedContainerAllocator = true; + super.stop(); + } + } + + @Override + public void cleanupStagingDir() throws IOException { + cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator; + } + + @Override + protected void sysexit() { + } + } + + @Test + public void testStagingCleanupOrder() throws Exception { + MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, + this.getClass().getName(), true); + JobImpl job = (JobImpl)app.submit(new Configuration()); + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + + int waitTime = 20 * 1000; + while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) { + Thread.sleep(100); + waitTime -= 100; + } + Assert.assertTrue("Staging directory not cleaned before notifying RM", + app.cleanedBeforeContainerAllocatorStopped); + } } \ No newline at end of file