hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r1091696 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/ mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/
Date Wed, 13 Apr 2011 08:18:49 GMT
Author: sharad
Date: Wed Apr 13 08:18:49 2011
New Revision: 1091696

URL: http://svn.apache.org/viewvc?rev=1091696&view=rev
Log:
Add HistoryCleanerService to Job History server. Contributed by Krishna Ramachandran.

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1091696&r1=1091695&r2=1091696&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Wed Apr 13 08:18:49 2011
@@ -7,6 +7,9 @@ Trunk (unreleased changes)
     Implement 'bin/mapred job -list' and 'bin/mapred job
     -list-active-trackers'. (acmurthy)
 
+    Add HistoryCleanerService to Job History server. (Krishna Ramachandran
+    via sharad)
+
   INCOMPATIBLE CHANGES
 
     MAPREDUCE-1866. Removes deprecated class

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1091696&r1=1091695&r2=1091696&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
Wed Apr 13 08:18:49 2011
@@ -47,4 +47,10 @@ public class YarnMRJobConfig {
 
   public static final String HISTORY_DONE_DIR_KEY =
        "yarn.historyfile.doneDir";
+  public static final String HISTORY_MAXAGE =
+	  "yarn.historyfile.maxage";
+  public static final String HS_WEBAPP_BIND_ADDRESS = HS_PREFIX +
+      "address.webapp";
+  public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =
+	  "0.0.0.0:19888";
 }

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java?rev=1091696&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
(added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
Wed Apr 13 08:18:49 2011
@@ -0,0 +1,142 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class HistoryCleanerService extends AbstractService {
+
+  private static final Log LOG = LogFactory.getLog(HistoryClientService.class);
+  
+  static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L;
+  private FileContext doneDirFc;
+  private HistoryCleaner historyCleanerThread = null;
+
+  private Configuration conf;
+
+  public HistoryCleanerService(Configuration conf) {
+    super("HistoryCleanerService");
+    this.conf = conf;
+  }
+
+  public void start() {
+    long maxAgeOfHistoryFiles = conf.getLong(
+        YarnMRJobConfig.HISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE);
+    historyCleanerThread  = new HistoryCleaner(maxAgeOfHistoryFiles);
+    historyCleanerThread.start();
+    super.start();
+  }
+
+  /** Shut down JobHistory after stopping the History cleaner */
+  @Override
+  public void stop() {
+    LOG.info("Interrupting History Cleaner");
+    historyCleanerThread.interrupt();
+    try {
+      historyCleanerThread.join();
+    } catch (InterruptedException e) {
+      LOG.info("Error with shutting down history cleaner thread");
+    }
+  }
+  /**
+   * Delete history files older than a specified time duration.
+   */
+  class HistoryCleaner extends Thread {
+    static final long ONE_DAY_IN_MS = 7 * 24 * 60 * 60 * 1000L;
+    private long cleanupFrequency;
+    private long maxAgeOfHistoryFiles;
+
+    public HistoryCleaner(long maxAge) {
+      setName("Thread for cleaning up History files");
+      setDaemon(true);
+      this.maxAgeOfHistoryFiles = maxAge;
+      cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles);
+      LOG.info("Job History Cleaner Thread started." +
+          " MaxAge is " + 
+          maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," +
+          " Cleanup Frequency is " +
+          + cleanupFrequency + " ms (" +
+          ((float)cleanupFrequency)/ONE_DAY_IN_MS + " days)");
+    }
+
+    @Override
+    public void run(){
+  
+      while (true) {
+        try {
+          doCleanup(); 
+          Thread.sleep(cleanupFrequency);
+        }
+        catch (InterruptedException e) {
+          LOG.info("History Cleaner thread exiting");
+          return;
+        }
+        catch (Throwable t) {
+          LOG.warn("History cleaner thread threw an exception", t);
+        }
+      }
+    }
+
+    private void doCleanup() {
+      long now = System.currentTimeMillis();
+      try {
+        String defaultDoneDir = conf.get(
+            YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
+        String  jobhistoryDir =
+          conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY, defaultDoneDir);
+        Path done = FileContext.getFileContext(conf).makeQualified(
+            new Path(jobhistoryDir));
+        doneDirFc = FileContext.getFileContext(done.toUri(), conf);
+        RemoteIterator<LocatedFileStatus> historyFiles =
+          doneDirFc.util().listFiles(done, true);
+        if (historyFiles != null) {
+          FileStatus f;
+          while (historyFiles.hasNext()) {
+            f = historyFiles.next();
+            if (now - f.getModificationTime() > maxAgeOfHistoryFiles) {
+              doneDirFc.delete(f.getPath(), true); 
+              LOG.info("Deleting old history file : " + f.getPath());
+            }
+          }
+        }
+      } catch (IOException ie) {
+        LOG.info("Error cleaning up history directory" + 
+            StringUtils.stringifyException(ie));
+      }
+    }
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1091696&r1=1091695&r2=1091696&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
Wed Apr 13 08:18:49 2011
@@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.service.Co
  *****************************************************************/
 public class JobHistoryServer extends CompositeService {
   private static final Log LOG = LogFactory.getLog(JobHistoryServer.class);
+  private HistoryClientService clientService;
+  private HistoryCleanerService cleanerService;
 
   static{
     Configuration.addDefaultResource("mapred-default.xml");
@@ -46,15 +48,18 @@ public class JobHistoryServer extends Co
   public synchronized void init(Configuration conf) {
     Configuration config = new YarnConfiguration(conf);
     HistoryContext history = new JobHistory(conf);
-    addService(new HistoryClientService(history));
-    //TODO: add HistoryCleaner service
+    clientService = new HistoryClientService(history);
+    cleanerService = new HistoryCleanerService(config);
+    addService(clientService);
+    addService(cleanerService);
     super.init(config);
   }
 
   public static void main(String[] args) {
     StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
+    JobHistoryServer server = null;
     try {
-      JobHistoryServer server = new JobHistoryServer();
+      server = new JobHistoryServer();
       YarnConfiguration conf = new YarnConfiguration(new JobConf());
       server.init(conf);
       server.start();



Mime
View raw message