Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 820FA1013C for ; Tue, 23 Jul 2013 19:25:57 +0000 (UTC) Received: (qmail 99522 invoked by uid 500); 23 Jul 2013 19:25:48 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 99232 invoked by uid 500); 23 Jul 2013 19:25:46 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 99147 invoked by uid 99); 23 Jul 2013 19:25:45 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jul 2013 19:25:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jul 2013 19:25:41 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 70D0B2388AB8; Tue, 23 Jul 2013 19:25:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1506230 - in /hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src: main/java/org/apache/hadoop/yarn/logaggregation/ test/java/org/apache/hadoop/yarn/logaggregation/ Date: Tue, 23 Jul 2013 19:25:20 -0000 To: yarn-commits@hadoop.apache.org From: jlowe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130723192520.70D0B2388AB8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jlowe Date: Tue Jul 23 19:25:19 2013 New Revision: 1506230 URL: http://svn.apache.org/r1506230 Log: svn merge -c 1506226 FIXES: MAPREDUCE-5356. Ability to refresh aggregated log retention period and check interval. Contributed by Ashwin Shankar Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java?rev=1506230&r1=1506229&r2=1506230&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java Tue Jul 23 19:25:19 2013 @@ -41,6 +41,7 @@ public class AggregatedLogDeletionServic private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class); private Timer timer = null; + private long checkIntervalMsecs; static class LogDeletionTask extends TimerTask { private Configuration conf; @@ -133,37 +134,71 @@ public class AggregatedLogDeletionServic @Override protected void serviceStart() throws Exception { + scheduleLogDeletionTask(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + stopTimer(); + super.serviceStop(); + } + + private void setLogAggCheckIntervalMsecs(long retentionSecs) { + Configuration conf = getConfig(); + checkIntervalMsecs = 1000 * conf + .getLong( + YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS); + if (checkIntervalMsecs <= 0) { + // when unspecified compute check interval as 1/10th of retention + checkIntervalMsecs = (retentionSecs * 1000) / 10; + } + } + + public void refreshLogRetentionSettings() { + if (getServiceState() == STATE.STARTED) { + Configuration conf = createConf(); + setConfig(conf); + stopTimer(); + scheduleLogDeletionTask(); + } else { + LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log Deletion Service is not started"); + } + } + + private void scheduleLogDeletionTask() { Configuration conf = getConfig(); if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { - //Log aggregation is not enabled so don't bother + // Log aggregation is not enabled so don't bother return; } - long retentionSecs = conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, + long retentionSecs = conf.getLong( + YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS); - if(retentionSecs < 0) { - LOG.info("Log Aggregation deletion is disabled because retention is" + - " too small (" + retentionSecs + ")"); + if (retentionSecs < 0) { + LOG.info("Log Aggregation deletion is disabled because retention is" + + " too small (" + retentionSecs + ")"); return; } - long checkIntervalMsecs = 1000 * conf.getLong( - YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS); - if (checkIntervalMsecs <= 0) { - // when unspecified compute check interval as 1/10th of retention - checkIntervalMsecs = (retentionSecs * 1000) / 10; - } + setLogAggCheckIntervalMsecs(retentionSecs); TimerTask task = new LogDeletionTask(conf, retentionSecs); timer = new Timer(); timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); - super.serviceStart(); } - @Override - protected void serviceStop() throws Exception { - if(timer != null) { + private void stopTimer() { + if (timer != null) { timer.cancel(); } - super.serviceStop(); + } + + public long getCheckIntervalMsecs() { + return checkIntervalMsecs; + } + + protected Configuration createConf() { + return new Configuration(); } } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java?rev=1506230&r1=1506229&r2=1506230&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java Tue Jul 23 19:25:19 2013 @@ -30,6 +30,7 @@ import org.apache.hadoop.security.Access import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Before; import org.junit.Test; +import org.junit.Assert; import static org.mockito.Mockito.*; @@ -129,6 +130,99 @@ public class TestAggregatedLogDeletionSe } @Test + public void testRefreshLogRetentionSettings() throws IOException { + long now = System.currentTimeMillis(); + //time before 2000 sec + long before2000Secs = now - (2000 * 1000); + //time before 50 sec + long before50Secs = now - (50 * 1000); + String root = "mockfs://foo/"; + String remoteRootLogDir = root + "tmp/logs"; + String suffix = "logs"; + final Configuration conf = new Configuration(); + conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); + conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, + "1"); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + + Path rootPath = new Path(root); + FileSystem rootFs = rootPath.getFileSystem(conf); + FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem(); + + Path remoteRootLogPath = new Path(remoteRootLogDir); + + Path userDir = new Path(remoteRootLogPath, "me"); + FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs, + userDir); + + when(mockFs.listStatus(remoteRootLogPath)).thenReturn( + new FileStatus[] { userDirStatus }); + + Path userLogDir = new Path(userDir, suffix); + + //Set time last modified of app1Dir directory and its files to before2000Secs + Path app1Dir = new Path(userLogDir, "application_1_1"); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, + app1Dir); + + //Set time last modified of app1Dir directory and its files to before50Secs + Path app2Dir = new Path(userLogDir, "application_1_2"); + FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, + app2Dir); + + when(mockFs.listStatus(userLogDir)).thenReturn( + new FileStatus[] { app1DirStatus, app2DirStatus }); + + Path app1Log1 = new Path(app1Dir, "host1"); + FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs, + app1Log1); + + when(mockFs.listStatus(app1Dir)).thenReturn( + new FileStatus[] { app1Log1Status }); + + Path app2Log1 = new Path(app2Dir, "host1"); + FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs, + app2Log1); + + when(mockFs.listStatus(app2Dir)).thenReturn( + new FileStatus[] { app2Log1Status }); + + AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() { + @Override + protected Configuration createConf() { + return conf; + } + }; + + deletionSvc.init(conf); + deletionSvc.start(); + + //app1Dir would be deleted since its done above log retention period + verify(mockFs, timeout(10000)).delete(app1Dir, true); + //app2Dir is not expected to be deleted since its below the threshold + verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true); + + //Now,lets change the confs + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50"); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, + "2"); + //We have not called refreshLogSettings,hence don't expect to see the changed conf values + Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs()); + + //refresh the log settings + deletionSvc.refreshLogRetentionSettings(); + + //Check interval time should reflect the new value + Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs()); + //app2Dir should be deleted since it falls above the threshold + verify(mockFs, timeout(10000)).delete(app2Dir, true); + deletionSvc.stop(); + } + + @Test public void testCheckInterval() throws Exception { long RETENTION_SECS = 10 * 24 * 3600; long now = System.currentTimeMillis(); @@ -176,7 +270,7 @@ public class TestAggregatedLogDeletionSe new AggregatedLogDeletionService(); deletionSvc.init(conf); deletionSvc.start(); - + verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class)); verify(mockFs, never()).delete(app1Dir, true);