hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1521662 - in /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase: ./ master/cleaner/ replication/master/ util/
Date Tue, 10 Sep 2013 21:46:34 GMT
Author: larsh
Date: Tue Sep 10 21:46:33 2013
New Revision: 1521662

URL: http://svn.apache.org/r1521662
Log:
Backport HBASE- 9208 - ReplicationLogCleaner slow at large scale (Dave Latham)

Added:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java
Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java?rev=1521662&r1=1521661&r2=1521662&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java Tue Sep
10 21:46:33 2013
@@ -56,22 +56,22 @@ public abstract class HBaseFileSystem {
   }
 
   /**
-   * Deletes a file. Assumes the user has already checked for this directory existence.
+   * Deletes a file. Assumes the user has already checked for this file's existence.
    * @param fs
-   * @param dir
-   * @return true if the directory is deleted.
+   * @param file
+   * @return true if the file is deleted.
    * @throws IOException
    */
-  public static boolean deleteFileFromFileSystem(FileSystem fs, Path dir)
+  public static boolean deleteFileFromFileSystem(FileSystem fs, Path file)
       throws IOException {
     IOException lastIOE = null;
     int i = 0;
     do {
       try {
-        return fs.delete(dir, false);
+        return fs.delete(file, false);
       } catch (IOException ioe) {
         lastIOE = ioe;
-        if (!fs.exists(dir)) return true;
+        if (!fs.exists(file)) return true;
         // dir is there, retry deleting after some time.
         sleepBeforeRetry("Delete File", i + 1);
       }
@@ -81,7 +81,7 @@ public abstract class HBaseFileSystem {
   
   
   /**
-   * Deletes a directory. Assumes the user has already checked for this directory existence.
+   * Deletes a directory. Assumes the user has already checked for this directory's existence.
    * @param fs
    * @param dir
    * @return true if the directory is deleted.

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java?rev=1521662&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java
(added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java
Tue Sep 10 21:46:33 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.BaseConfigurable;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * Base class for file cleaners which allows subclasses to implement a simple
+ * isFileDeletable method (which used to be the FileCleanerDelegate contract).
+ */
+public abstract class BaseFileCleanerDelegate extends BaseConfigurable
+implements FileCleanerDelegate {
+
+  @Override
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+    return Iterables.filter(files, new Predicate<FileStatus>() {
+      @Override
+      public boolean apply(FileStatus file) {
+        return isFileDeletable(file);
+      }});
+  }
+
+  /**
+   * Should the master delete the file or keep it?
+   * @param fStat file status of the file to check
+   * @return <tt>true</tt> if the file is deletable, <tt>false</tt>
if not
+   */
+  protected abstract boolean isFileDeletable(FileStatus fStat);
+
+}

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java?rev=1521662&r1=1521661&r2=1521662&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java
Tue Sep 10 21:46:33 2013
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.cleaner;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.BaseConfigurable;
 
 /**
  * Base class for the hfile cleaning function inside the master. By default, only the
@@ -36,8 +35,7 @@ import org.apache.hadoop.hbase.BaseConfi
  * provide a default constructor.
  */
 @InterfaceAudience.Private
-public abstract class BaseHFileCleanerDelegate extends BaseConfigurable implements
-    FileCleanerDelegate {
+public abstract class BaseHFileCleanerDelegate extends BaseFileCleanerDelegate {
 
   private boolean stopped = false;
 

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java?rev=1521662&r1=1521661&r2=1521662&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java
Tue Sep 10 21:46:33 2013
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.master.c
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.BaseConfigurable;
 
 /**
  * Base class for the log cleaning function inside the master. By default, two
@@ -38,7 +36,7 @@ import org.apache.hadoop.hbase.BaseConfi
  * provide a default constructor.
  */
 @InterfaceAudience.Private
-public abstract class BaseLogCleanerDelegate extends BaseConfigurable implements FileCleanerDelegate
{
+public abstract class BaseLogCleanerDelegate extends BaseFileCleanerDelegate {
 
   @Override
   public boolean isFileDeletable(FileStatus fStat) {
@@ -50,8 +48,10 @@ public abstract class BaseLogCleanerDele
    * <p>
    * Implementing classes should override {@link #isFileDeletable(FileStatus)} instead.
    * @param fStat file status of the file
-   * @return true if the log is deletable, false if not
+   * @return true if the log is deletable, false (default) if not
    */
   @Deprecated
-  public abstract boolean isLogDeletable(FileStatus fStat);
+  public boolean isLogDeletable(FileStatus fStat) {
+    return false;
+  }
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java?rev=1521662&r1=1521661&r2=1521662&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
Tue Sep 10 21:46:33 2013
@@ -33,6 +33,11 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.util.FSUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 /**
  * Abstract Cleaner that uses a chain of delegates to clean a directory of files
  * @param <T> Cleaner delegate class that is dynamically loaded from configuration
@@ -74,7 +79,7 @@ public abstract class CleanerChore<T ext
   protected abstract boolean validate(Path file);
 
   /**
-   * Instanitate and initialize all the file cleaners set in the configuration
+   * Instantiate and initialize all the file cleaners set in the configuration
    * @param confKey key to get the file cleaner classes from the configuration
    */
   private void initCleanerChain(String confKey) {
@@ -98,7 +103,7 @@ public abstract class CleanerChore<T ext
    * @param conf
    * @return the new instance
    */
-  public T newFileCleaner(String className, Configuration conf) {
+  private T newFileCleaner(String className, Configuration conf) {
     try {
       Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
         FileCleanerDelegate.class);
@@ -116,131 +121,153 @@ public abstract class CleanerChore<T ext
   @Override
   protected void chore() {
     try {
-      FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir, null);
-      // if the path (file or directory) doesn't exist, then we can just return
-      if (files == null) return;
-      // loop over the found files and see if they should be deleted
-      for (FileStatus file : files) {
-        try {
-          if (file.isDir()) checkAndDeleteDirectory(file.getPath());
-          else checkAndDelete(file);
-        } catch (IOException e) {
-          e = RemoteExceptionHandler.checkIOException(e);
-          LOG.warn("Error while cleaning the logs", e);
-        }
-      }
+      FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
+      checkAndDeleteEntries(files);
     } catch (IOException e) {
-      LOG.warn("Failed to get status of:" + oldFileDir);
+      e = RemoteExceptionHandler.checkIOException(e);
+      LOG.warn("Error while cleaning the logs", e);
     }
-
   }
 
   /**
+   * Loop over the given directory entries, and check whether they can be deleted.
+   * If an entry is itself a directory it will be recursively checked and deleted itself
iff
+   * all subentries are deleted (and no new subentries are added in the mean time)
+   *
+   * @param entries directory entries to check
+   * @return true if all entries were successfully deleted
+   */
+  private boolean checkAndDeleteEntries(FileStatus[] entries) {
+    if (entries == null) {
+      return true;
+    }
+    boolean allEntriesDeleted = true;
+    List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
+    for (FileStatus child : entries) {
+      Path path = child.getPath();
+      if (child.isDir()) {
+        // for each subdirectory delete it and all entries if possible
+        if (!checkAndDeleteDirectory(path)) {
+          allEntriesDeleted = false;
+        }
+      } else {
+        // collect all files to attempt to delete in one batch
+        files.add(child);
+      }
+    }
+    if (!checkAndDeleteFiles(files)) {
+      allEntriesDeleted = false;
+    }
+    return allEntriesDeleted;
+  }
+  
+  /**
    * Attempt to delete a directory and all files under that directory. Each child file is
passed
-   * through the delegates to see if it can be deleted. If the directory has not children
when the
+   * through the delegates to see if it can be deleted. If the directory has no children
when the
    * cleaners have finished it is deleted.
    * <p>
    * If new children files are added between checks of the directory, the directory will
<b>not</b>
    * be deleted.
-   * @param toCheck directory to check
+   * @param dir directory to check
    * @return <tt>true</tt> if the directory was deleted, <tt>false</tt>
otherwise.
-   * @throws IOException if there is an unexpected filesystem error
    */
-  public boolean checkAndDeleteDirectory(Path toCheck) throws IOException {
+  @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Checking directory: " + toCheck);
+      LOG.trace("Checking directory: " + dir);
     }
-    FileStatus[] children = FSUtils.listStatus(fs, toCheck, null);
-    // if the directory doesn't exist, then we are done
-    if (children == null) {
-      try {
-        return HBaseFileSystem.deleteFileFromFileSystem(fs, toCheck);
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Couldn't delete directory: " + toCheck, e);
-        }
-      }
-      // couldn't delete w/o exception, so we can't return success.
-      return false;
-    }     
 
-    boolean canDeleteThis = true;
-    for (FileStatus child : children) {
-      Path path = child.getPath();
-      // attempt to delete all the files under the directory
-      if (child.isDir()) {
-        if (!checkAndDeleteDirectory(path)) {
-          canDeleteThis = false;
-        }
-      }
-      // otherwise we can just check the file
-      else if (!checkAndDelete(child)) {
-        canDeleteThis = false;
-      }
+    try {
+      FileStatus[] children = FSUtils.listStatus(fs, dir);
+      boolean allChildrenDeleted = checkAndDeleteEntries(children);
+  
+      // if the directory still has children, we can't delete it, so we are done
+      if (!allChildrenDeleted) return false;
+    } catch (IOException e) {
+      e = RemoteExceptionHandler.checkIOException(e);
+      LOG.warn("Error while listing directory: " + dir, e);
+      // couldn't list directory, so don't try to delete, and don't return success
+      return false;
     }
 
-    // if the directory has children, we can't delete it, so we are done
-    if (!canDeleteThis) return false;
-
     // otherwise, all the children (that we know about) have been deleted, so we should try
to
     // delete this directory. However, don't do so recursively so we don't delete files that
have
     // been added since we last checked.
     try {
-      return HBaseFileSystem.deleteFileFromFileSystem(fs, toCheck);
+      return HBaseFileSystem.deleteFileFromFileSystem(fs, dir);
     } catch (IOException e) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Couldn't delete directory: " + toCheck, e);
+        LOG.trace("Couldn't delete directory: " + dir, e);
       }
+      // couldn't delete w/o exception, so we can't return success.
+      return false;
     }
-
-    // couldn't delete w/o exception, so we can't return success.
-    return false;
   }
 
   /**
-   * Run the given file through each of the cleaners to see if it should be deleted, deleting
it if
+   * Run the given files through each of the cleaners to see if it should be deleted, deleting
it if
    * necessary.
-   * @param fStat path of the file to check (and possibly delete)
-   * @throws IOException if cann't delete a file because of a filesystem issue
-   * @throws IllegalArgumentException if the file is a directory and has children
+   * @param files List of FileStatus for the files to check (and possibly delete)
+   * @return true iff successfully deleted all files
    */
-  private boolean checkAndDelete(FileStatus fStat) throws IOException, IllegalArgumentException
{
-    Path filePath = fStat.getPath();
+  private boolean checkAndDeleteFiles(List<FileStatus> files) {
     // first check to see if the path is valid
-    if (!validate(filePath)) {
-      LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it.");
-      boolean success = HBaseFileSystem.deleteDirFromFileSystem(fs, filePath);
-      if (!success) LOG.warn("Attempted to delete:" + filePath
-          + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
-
-      return success;
+    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
+    List<FileStatus> invalidFiles = Lists.newArrayList();
+    for (FileStatus file : files) {
+      if (validate(file.getPath())) {
+        validFiles.add(file);
+      } else {
+        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
+        invalidFiles.add(file);
+      }
     }
-    // check each of the cleaners for the file
+
+    Iterable<FileStatus> deletableValidFiles = validFiles;
+    // check each of the cleaners for the valid files
     for (T cleaner : cleanersChain) {
       if (cleaner.isStopped() || this.stopper.isStopped()) {
-        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any file
in:"
+        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more
files in:"
             + this.oldFileDir);
         return false;
       }
 
-      if (!cleaner.isFileDeletable(fStat)) {
-        // this file is not deletable, then we are done
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(filePath + " is not deletable according to:" + cleaner);
+      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
+      
+      // trace which cleaner is holding on to each file
+      if (LOG.isTraceEnabled()) {
+        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
+        for (FileStatus file : deletableValidFiles) {
+          if (!filteredFileSet.contains(file)) {
+            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
+          }
         }
-        return false;
       }
+      
+      deletableValidFiles = filteredFiles;
     }
-    // delete this file if it passes all the cleaners
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Removing:" + filePath + " from archive");
-    }
-    boolean success = HBaseFileSystem.deleteFileFromFileSystem(fs, filePath);
-    if (!success) {
-      LOG.warn("Attempted to delete:" + filePath
-          + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
+    
+    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
+    int deletedFileCount = 0;
+    for (FileStatus file : filesToDelete) {
+      Path filePath = file.getPath();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Removing: " + filePath + " from archive");
+      }
+      try {
+        boolean success = HBaseFileSystem.deleteFileFromFileSystem(fs, filePath);
+        if (success) {
+          deletedFileCount++;
+        } else {
+          LOG.warn("Attempted to delete:" + filePath
+              + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
+        }
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        LOG.warn("Error while deleting: " + filePath, e);
+      }
     }
-    return success;
+
+    return deletedFileCount == files.size();
   }
 
   @Override

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java?rev=1521662&r1=1521661&r2=1521662&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
Tue Sep 10 21:46:33 2013
@@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.Stoppable
 public interface FileCleanerDelegate extends Configurable, Stoppable {
 
   /**
-   * Should the master delete the file or keep it?
-   * @param fStat file status of the file to check
-   * @return <tt>true</tt> if the file is deletable, <tt>false</tt>
if not
+   * Determines which of the given files are safe to delete
+   * @param files files to check for deletion
+   * @return files that are ok to delete according to this cleaner
    */
-  public boolean isFileDeletable(FileStatus fStat);
+  Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files);
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java?rev=1521662&r1=1521661&r2=1521662&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
Tue Sep 10 21:46:33 2013
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hbase.master.cleaner;
 
 import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,11 +26,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
 
 /**
  * HFileLink cleaner that determines if a hfile should be deleted.

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1521662&r1=1521661&r2=1521662&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
Tue Sep 10 21:46:33 2013
@@ -19,11 +19,14 @@
  */
 package org.apache.hadoop.hbase.replication.master;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -32,10 +35,11 @@ import org.apache.hadoop.hbase.replicati
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for
@@ -44,7 +48,6 @@ import java.util.Set;
 public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
   private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
   private ReplicationZookeeper zkHelper;
-  private Set<String> hlogs = new HashSet<String>();
   private boolean stopped = false;
   private boolean aborted;
 
@@ -54,51 +57,49 @@ public class ReplicationLogCleaner exten
   public ReplicationLogCleaner() {}
 
   @Override
-  public boolean isLogDeletable(FileStatus fStat) {
-
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
     try {
       if (!zkHelper.getReplication()) {
-        return false;
+        return ImmutableList.of();
       }
     } catch (KeeperException e) {
       abort("Cannot get the state of replication", e);
-      return false;
+      return ImmutableList.of();
     }
 
-    // all members of this class are null if replication is disabled, and we
-    // return true since false would render the LogsCleaner useless
+   // all members of this class are null if replication is disabled, 
+   // so we cannot filter the files
     if (this.getConf() == null) {
-      return true;
-    }
-    String log = fStat.getPath().getName();
-    // If we saw the hlog previously, let's consider it's still used
-    // At some point in the future we will refresh the list and it will be gone
-    if (this.hlogs.contains(log)) {
-      return false;
+      return files;
     }
-
-    // Let's see it's still there
-    // This solution makes every miss very expensive to process since we
-    // almost completely refresh the cache each time
-    return !refreshHLogsAndSearch(log);
+    
+    final Set<String> hlogs = loadHLogsFromQueues();
+    return Iterables.filter(files, new Predicate<FileStatus>() {
+      @Override
+      public boolean apply(FileStatus file) {
+        String hlog = file.getPath().getName();
+        boolean logInReplicationQueue = hlogs.contains(hlog);
+        if (LOG.isDebugEnabled()) {
+          if (logInReplicationQueue) {
+            LOG.debug("Found log in ZK, keeping: " + hlog);
+          } else {
+            LOG.debug("Didn't find this log in ZK, deleting: " + hlog);
+          }
+        }
+       return !logInReplicationQueue;
+      }});
   }
 
   /**
-   * Search through all the hlogs we have in ZK to refresh the cache
-   * If a log is specified and found, then we early out and return true
-   * @param searchedLog log we are searching for, pass null to cache everything
-   *                    that's in zookeeper.
-   * @return false until a specified log is found.
+   * Load all hlogs in all replication queues from ZK
    */
-  private boolean refreshHLogsAndSearch(String searchedLog) {
-    this.hlogs.clear();
-    final boolean lookForLog = searchedLog != null;
+  private Set<String> loadHLogsFromQueues() {
     List<String> rss = zkHelper.getListOfReplicators();
     if (rss == null) {
-      LOG.debug("Didn't find any region server that replicates, deleting: " +
-          searchedLog);
-      return false;
+      LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+      return ImmutableSet.of();
     }
+    Set<String> hlogs = Sets.newHashSet();
     for (String rs: rss) {
       List<String> listOfPeers = zkHelper.getListPeersForRS(rs);
       // if rs just died, this will be null
@@ -108,23 +109,18 @@ public class ReplicationLogCleaner exten
       for (String id : listOfPeers) {
         List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id);
         if (peersHlogs != null) {
-          this.hlogs.addAll(peersHlogs);
-        }
-        // early exit if we found the log
-        if(lookForLog && this.hlogs.contains(searchedLog)) {
-          LOG.debug("Found log in ZK, keeping: " + searchedLog);
-          return true;
+          hlogs.addAll(peersHlogs);
         }
       }
     }
-    LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
-    return false;
+    return hlogs;
   }
 
   @Override
   public void setConf(Configuration config) {
     // If replication is disabled, keep all members null
     if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+      LOG.warn("Not configured - allowing all hlogs to be deleted");
       return;
     }
     // Make my own Configuration.  Then I'll have my own connection to zk that
@@ -139,7 +135,6 @@ public class ReplicationLogCleaner exten
     } catch (IOException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     }
-    refreshHLogsAndSearch(null);
   }
 
 

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1521662&r1=1521661&r2=1521662&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Sep 10
21:46:33 2013
@@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -1262,7 +1261,7 @@ public abstract class FSUtils {
    * @param fs file system
    * @param dir directory
    * @param filter path filter
-   * @return null if tabledir doesn't exist, otherwise FileStatus array
+   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
    */
   public static FileStatus [] listStatus(final FileSystem fs,
       final Path dir, final PathFilter filter) throws IOException {
@@ -1283,7 +1282,7 @@ public abstract class FSUtils {
    *
    * @param fs file system
    * @param dir directory
-   * @return null if tabledir doesn't exist, otherwise FileStatus array
+   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
    */
   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException
{
     return listStatus(fs, dir, null);



Mime
View raw message