hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject svn commit: r1506226 - in /hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src: main/java/org/apache/hadoop/yarn/logaggregation/ test/java/org/apache/hadoop/yarn/logaggregation/
Date Tue, 23 Jul 2013 19:20:58 GMT
Author: jlowe
Date: Tue Jul 23 19:20:57 2013
New Revision: 1506226

URL: http://svn.apache.org/r1506226
Log:
MAPREDUCE-5356. Ability to refresh aggregated log retention period and check interval. Contributed
by Ashwin Shankar

Modified:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java?rev=1506226&r1=1506225&r2=1506226&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
Tue Jul 23 19:20:57 2013
@@ -41,6 +41,7 @@ public class AggregatedLogDeletionServic
   private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
   
   private Timer timer = null;
+  private long checkIntervalMsecs;
   
   static class LogDeletionTask extends TimerTask {
     private Configuration conf;
@@ -133,37 +134,71 @@ public class AggregatedLogDeletionServic
 
   @Override
   protected void serviceStart() throws Exception {
+    scheduleLogDeletionTask();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    stopTimer();
+    super.serviceStop();
+  }
+  
+  private void setLogAggCheckIntervalMsecs(long retentionSecs) {
+    Configuration conf = getConfig();
+    checkIntervalMsecs = 1000 * conf
+        .getLong(
+            YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
+    if (checkIntervalMsecs <= 0) {
+      // when unspecified compute check interval as 1/10th of retention
+      checkIntervalMsecs = (retentionSecs * 1000) / 10;
+    }
+  }
+  
+  public void refreshLogRetentionSettings() {
+    if (getServiceState() == STATE.STARTED) {
+      Configuration conf = createConf();
+      setConfig(conf);
+      stopTimer();
+      scheduleLogDeletionTask();
+    } else {
+      LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log Deletion Service
is not started");
+    }
+  }
+  
+  private void scheduleLogDeletionTask() {
     Configuration conf = getConfig();
     if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
         YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
-      //Log aggregation is not enabled so don't bother
+      // Log aggregation is not enabled so don't bother
       return;
     }
-    long retentionSecs = conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
+    long retentionSecs = conf.getLong(
+        YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
         YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
-    if(retentionSecs < 0) {
-      LOG.info("Log Aggregation deletion is disabled because retention is" +
-      		" too small (" + retentionSecs + ")");
+    if (retentionSecs < 0) {
+      LOG.info("Log Aggregation deletion is disabled because retention is"
+          + " too small (" + retentionSecs + ")");
       return;
     }
-    long checkIntervalMsecs = 1000 * conf.getLong(
-        YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
-        YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
-    if (checkIntervalMsecs <= 0) {
-      // when unspecified compute check interval as 1/10th of retention
-      checkIntervalMsecs = (retentionSecs * 1000) / 10;
-    }
+    setLogAggCheckIntervalMsecs(retentionSecs);
     TimerTask task = new LogDeletionTask(conf, retentionSecs);
     timer = new Timer();
     timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
-    super.serviceStart();
   }
 
-  @Override
-  protected void serviceStop() throws Exception {
-    if(timer != null) {
+  private void stopTimer() {
+    if (timer != null) {
       timer.cancel();
     }
-    super.serviceStop();
+  }
+  
+  public long getCheckIntervalMsecs() {
+    return checkIntervalMsecs;
+  }
+
+  protected Configuration createConf() {
+    return new Configuration();
   }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java?rev=1506226&r1=1506225&r2=1506226&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
Tue Jul 23 19:20:57 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.Assert;
 
 import static org.mockito.Mockito.*;
 
@@ -129,6 +130,99 @@ public class TestAggregatedLogDeletionSe
   }
 
   @Test
+  public void testRefreshLogRetentionSettings() throws IOException {
+    long now = System.currentTimeMillis();
+    //time before 2000 sec
+    long before2000Secs = now - (2000 * 1000);
+    //time before 50 sec
+    long before50Secs = now - (50 * 1000);
+    String root = "mockfs://foo/";
+    String remoteRootLogDir = root + "tmp/logs";
+    String suffix = "logs";
+    final Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+        "1");
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
+
+    Path rootPath = new Path(root);
+    FileSystem rootFs = rootPath.getFileSystem(conf);
+    FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
+
+    Path remoteRootLogPath = new Path(remoteRootLogDir);
+
+    Path userDir = new Path(remoteRootLogPath, "me");
+    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
+        userDir);
+
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
+        new FileStatus[] { userDirStatus });
+
+    Path userLogDir = new Path(userDir, suffix);
+
+    //Set time last modified of app1Dir directory and its files to before2000Secs 
+    Path app1Dir = new Path(userLogDir, "application_1_1");
+    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
+        app1Dir);
+    
+    //Set time last modified of app1Dir directory and its files to before50Secs 
+    Path app2Dir = new Path(userLogDir, "application_1_2");
+    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
+        app2Dir);
+
+    when(mockFs.listStatus(userLogDir)).thenReturn(
+        new FileStatus[] { app1DirStatus, app2DirStatus });
+
+    Path app1Log1 = new Path(app1Dir, "host1");
+    FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
+        app1Log1);
+
+    when(mockFs.listStatus(app1Dir)).thenReturn(
+        new FileStatus[] { app1Log1Status });
+
+    Path app2Log1 = new Path(app2Dir, "host1");
+    FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
+        app2Log1);
+
+    when(mockFs.listStatus(app2Dir)).thenReturn(
+        new FileStatus[] { app2Log1Status });
+
+    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
+      @Override
+      protected Configuration createConf() {
+        return conf;
+      }
+    };
+    
+    deletionSvc.init(conf);
+    deletionSvc.start();
+    
+    //app1Dir would be deleted since its done above log retention period
+    verify(mockFs, timeout(10000)).delete(app1Dir, true);
+    //app2Dir is not expected to be deleted since its below the threshold
+    verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
+
+    //Now,lets change the confs
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+        "2");
+    //We have not called refreshLogSettings,hence don't expect to see the changed conf values
+    Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
+    
+    //refresh the log settings
+    deletionSvc.refreshLogRetentionSettings();
+
+    //Check interval time should reflect the new value
+    Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
+    //app2Dir should be deleted since it falls above the threshold
+    verify(mockFs, timeout(10000)).delete(app2Dir, true);
+    deletionSvc.stop();
+  }
+  
+  @Test
   public void testCheckInterval() throws Exception {
     long RETENTION_SECS = 10 * 24 * 3600;
     long now = System.currentTimeMillis();
@@ -176,7 +270,7 @@ public class TestAggregatedLogDeletionSe
         new AggregatedLogDeletionService();
     deletionSvc.init(conf);
     deletionSvc.start();
-
+ 
     verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
     verify(mockFs, never()).delete(app1Dir, true);
 



Mime
View raw message