hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1518465 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/master/cleaner/ main/java/org/apache/hadoop/hbase/replication/master/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/backup/ex...
Date Thu, 29 Aug 2013 04:28:56 GMT
Author: stack
Date: Thu Aug 29 04:28:55 2013
New Revision: 1518465

URL: http://svn.apache.org/r1518465
Log:
HBASE-9208 ReplicationLogCleaner slow at large scale

Added:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java
Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java?rev=1518465&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java
(added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java
Thu Aug 29 04:28:55 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.BaseConfigurable;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * Base class for file cleaners which allows subclasses to implement a simple
+ * isFileDeletable method (which used to be the FileCleanerDelegate contract).
+ */
+public abstract class BaseFileCleanerDelegate extends BaseConfigurable
+implements FileCleanerDelegate {
+
+  @Override
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+    return Iterables.filter(files, new Predicate<FileStatus>() {
+      @Override
+      public boolean apply(FileStatus file) {
+        return isFileDeletable(file);
+      }});
+  }
+
+  /**
+   * Should the master delete the file or keep it?
+   * @param fStat file status of the file to check
+   * @return <tt>true</tt> if the file is deletable, <tt>false</tt>
if not
+   */
+  protected abstract boolean isFileDeletable(FileStatus fStat);
+
+}

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java?rev=1518465&r1=1518464&r2=1518465&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java
Thu Aug 29 04:28:55 2013
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.cleaner;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.BaseConfigurable;
 
 /**
  * Base class for the hfile cleaning function inside the master. By default, only the
@@ -36,8 +35,7 @@ import org.apache.hadoop.hbase.BaseConfi
  * provide a default constructor.
  */
 @InterfaceAudience.Private
-public abstract class BaseHFileCleanerDelegate extends BaseConfigurable implements
-    FileCleanerDelegate {
+public abstract class BaseHFileCleanerDelegate extends BaseFileCleanerDelegate {
 
   private boolean stopped = false;
 

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java?rev=1518465&r1=1518464&r2=1518465&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java
Thu Aug 29 04:28:55 2013
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.master.c
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.BaseConfigurable;
 
 /**
  * Base class for the log cleaning function inside the master. By default, two
@@ -38,7 +36,7 @@ import org.apache.hadoop.hbase.BaseConfi
  * provide a default constructor.
  */
 @InterfaceAudience.Private
-public abstract class BaseLogCleanerDelegate extends BaseConfigurable implements FileCleanerDelegate
{
+public abstract class BaseLogCleanerDelegate extends BaseFileCleanerDelegate {
 
   @Override
   public boolean isFileDeletable(FileStatus fStat) {

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java?rev=1518465&r1=1518464&r2=1518465&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
Thu Aug 29 04:28:55 2013
@@ -32,6 +32,11 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.util.FSUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 /**
  * Abstract Cleaner that uses a chain of delegates to clean a directory of files
  * @param <T> Cleaner delegate class that is dynamically loaded from configuration
@@ -97,7 +102,7 @@ public abstract class CleanerChore<T ext
    * @param conf
    * @return the new instance
    */
-  public T newFileCleaner(String className, Configuration conf) {
+  private T newFileCleaner(String className, Configuration conf) {
     try {
       Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
         FileCleanerDelegate.class);
@@ -115,135 +120,154 @@ public abstract class CleanerChore<T ext
   @Override
   protected void chore() {
     try {
-      FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir, null);
-      // if the path (file or directory) doesn't exist, then we can just return
-      if (files == null) return;
-      // loop over the found files and see if they should be deleted
-      for (FileStatus file : files) {
-        try {
-          if (file.isDir()) checkAndDeleteDirectory(file.getPath());
-          else checkAndDelete(file);
-        } catch (IOException e) {
-          e = RemoteExceptionHandler.checkIOException(e);
-          LOG.warn("Error while cleaning the logs", e);
-        }
-      }
+      FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
+      checkAndDeleteEntries(files);
     } catch (IOException e) {
-      LOG.warn("Failed to get status of: " + oldFileDir);
+      e = RemoteExceptionHandler.checkIOException(e);
+      LOG.warn("Error while cleaning the logs", e);
     }
-
   }
 
   /**
+   * Loop over the given directory entries, and check whether they can be deleted.
+   * If an entry is itself a directory it will be recursively checked and deleted itself
iff
+   * all subentries are deleted (and no new subentries are added in the mean time)
+   *
+   * @param entries directory entries to check
+   * @return true if all entries were successfully deleted
+   */
+  private boolean checkAndDeleteEntries(FileStatus[] entries) {
+    if (entries == null) {
+      return true;
+    }
+    boolean allEntriesDeleted = true;
+    List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
+    for (FileStatus child : entries) {
+      Path path = child.getPath();
+      if (child.isDir()) {
+        // for each subdirectory delete it and all entries if possible
+        if (!checkAndDeleteDirectory(path)) {
+          allEntriesDeleted = false;
+        }
+      } else {
+        // collect all files to attempt to delete in one batch
+        files.add(child);
+      }
+    }
+    if (!checkAndDeleteFiles(files)) {
+      allEntriesDeleted = false;
+    }
+    return allEntriesDeleted;
+  }
+  
+  /**
    * Attempt to delete a directory and all files under that directory. Each child file is
passed
    * through the delegates to see if it can be deleted. If the directory has no children
when the
    * cleaners have finished it is deleted.
    * <p>
    * If new children files are added between checks of the directory, the directory will
<b>not</b>
    * be deleted.
-   * @param toCheck directory to check
+   * @param dir directory to check
    * @return <tt>true</tt> if the directory was deleted, <tt>false</tt>
otherwise.
-   * @throws IOException if there is an unexpected filesystem error
    */
-  public boolean checkAndDeleteDirectory(Path toCheck) throws IOException {
+  @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Checking directory: " + toCheck);
-    }
-    FileStatus[] children = FSUtils.listStatus(fs, toCheck);
-    // if the directory doesn't exist, then we are done
-    if (children == null) {
-      try {
-        return fs.delete(toCheck, false);
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Couldn't delete directory: " + toCheck, e);
-        }
-      }
-      // couldn't delete w/o exception, so we can't return success.
-      return false;
+      LOG.trace("Checking directory: " + dir);
     }
 
-    boolean canDeleteThis = true;
-    for (FileStatus child : children) {
-      Path path = child.getPath();
-      // attempt to delete all the files under the directory
-      if (child.isDir()) {
-        if (!checkAndDeleteDirectory(path)) {
-          canDeleteThis = false;
-        }
-      }
-      // otherwise we can just check the file
-      else if (!checkAndDelete(child)) {
-        canDeleteThis = false;
-      }
+    try {
+      FileStatus[] children = FSUtils.listStatus(fs, dir);
+      boolean allChildrenDeleted = checkAndDeleteEntries(children);
+  
+      // if the directory still has children, we can't delete it, so we are done
+      if (!allChildrenDeleted) return false;
+    } catch (IOException e) {
+      e = RemoteExceptionHandler.checkIOException(e);
+      LOG.warn("Error while listing directory: " + dir, e);
+      // couldn't list directory, so don't try to delete, and don't return success
+      return false;
     }
 
-    // if the directory has children, we can't delete it, so we are done
-    if (!canDeleteThis) return false;
-
     // otherwise, all the children (that we know about) have been deleted, so we should try
to
     // delete this directory. However, don't do so recursively so we don't delete files that
have
     // been added since we last checked.
     try {
-      return fs.delete(toCheck, false);
+      return fs.delete(dir, false);
     } catch (IOException e) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Couldn't delete directory: " + toCheck, e);
+        LOG.trace("Couldn't delete directory: " + dir, e);
       }
+      // couldn't delete w/o exception, so we can't return success.
+      return false;
     }
-
-    // couldn't delete w/o exception, so we can't return success.
-    return false;
   }
 
   /**
-   * Run the given file through each of the cleaners to see if it should be deleted, deleting
it if
+   * Run the given files through each of the cleaners to see if it should be deleted, deleting
it if
    * necessary.
-   * @param fStat path of the file to check (and possibly delete)
-   * @throws IOException if cann't delete a file because of a filesystem issue
-   * @throws IllegalArgumentException if the file is a directory and has children
+   * @param files List of FileStatus for the files to check (and possibly delete)
+   * @return true iff successfully deleted all files
    */
-  private boolean checkAndDelete(FileStatus fStat) throws IOException, IllegalArgumentException
{
-    Path filePath = fStat.getPath();
+  private boolean checkAndDeleteFiles(List<FileStatus> files) {
     // first check to see if the path is valid
-    if (!validate(filePath)) {
-      LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it.");
-      boolean success = this.fs.delete(filePath, true);
-      if(!success)
-        LOG.warn("Attempted to delete: " + filePath
-            + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
-
-      return success;
+    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
+    List<FileStatus> invalidFiles = Lists.newArrayList();
+    for (FileStatus file : files) {
+      if (validate(file.getPath())) {
+        validFiles.add(file);
+      } else {
+        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
+        invalidFiles.add(file);
+      }
     }
 
-    // check each of the cleaners for the file
+    Iterable<FileStatus> deletableValidFiles = validFiles;
+    // check each of the cleaners for the valid files
     for (T cleaner : cleanersChain) {
       if (cleaner.isStopped() || this.stopper.isStopped()) {
-        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any file
in:"
+        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more
files in:"
             + this.oldFileDir);
         return false;
       }
 
-      if (!cleaner.isFileDeletable(fStat)) {
-        // this file is not deletable, then we are done
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(filePath + " is not deletable according to:" + cleaner);
+      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
+      
+      // trace which cleaner is holding on to each file
+      if (LOG.isTraceEnabled()) {
+        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
+        for (FileStatus file : deletableValidFiles) {
+          if (!filteredFileSet.contains(file)) {
+            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
+          }
         }
-        return false;
       }
+      
+      deletableValidFiles = filteredFiles;
     }
-    // delete this file if it passes all the cleaners
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Removing: " + filePath + " from archive");
-    }
-    boolean success = this.fs.delete(filePath, false);
-    if (!success) {
-      LOG.warn("Attempted to delete:" + filePath
-          + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
+    
+    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
+    int deletedFileCount = 0;
+    for (FileStatus file : filesToDelete) {
+      Path filePath = file.getPath();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Removing: " + filePath + " from archive");
+      }
+      try {
+        boolean success = this.fs.delete(filePath, false);
+        if (success) {
+          deletedFileCount++;
+        } else {
+          LOG.warn("Attempted to delete:" + filePath
+              + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
+        }
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        LOG.warn("Error while deleting: " + filePath, e);
+      }
     }
-    return success;
-  }
 
+    return deletedFileCount == files.size();
+  }
 
   @Override
   public void cleanup() {

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java?rev=1518465&r1=1518464&r2=1518465&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
Thu Aug 29 04:28:55 2013
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.Stoppable
 @InterfaceAudience.Private
 public interface FileCleanerDelegate extends Configurable, Stoppable {
   /**
-   * Should the master delete the file or keep it?
-   * @param fStat file status of the file to check
-   * @return <tt>true</tt> if the file is deletable, <tt>false</tt>
if not
+   * Determines which of the given files are safe to delete
+   * @param files files to check for deletion
+   * @return files that are ok to delete according to this cleaner
    */
-  boolean isFileDeletable(FileStatus fStat);
+  Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files);
 }

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java?rev=1518465&r1=1518464&r2=1518465&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
Thu Aug 29 04:28:55 2013
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hbase.master.cleaner;
 
 import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,11 +26,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
 
 /**
  * HFileLink cleaner that determines if a hfile should be deleted.

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java?rev=1518465&r1=1518464&r2=1518465&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
Thu Aug 29 04:28:55 2013
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 
 /**

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1518465&r1=1518464&r2=1518465&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
Thu Aug 29 04:28:55 2013
@@ -31,15 +31,18 @@ import org.apache.hadoop.hbase.master.cl
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for
  * replication before deleting it when its TTL is over.
@@ -49,48 +52,45 @@ public class ReplicationLogCleaner exten
   private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
   private ZooKeeperWatcher zkw;
   private ReplicationQueuesClient replicationQueues;
-  private final Set<String> hlogs = new HashSet<String>();
   private boolean stopped = false;
   private boolean aborted;
 
 
   @Override
-  public boolean isLogDeletable(FileStatus fStat) {
-
-    // all members of this class are null if replication is disabled, and we
-    // return true since false would render the LogsCleaner useless
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+   // all members of this class are null if replication is disabled, 
+   // so we cannot filter the files
     if (this.getConf() == null) {
-      return true;
+      return files;
     }
-    String log = fStat.getPath().getName();
-    // If we saw the hlog previously, let's consider it's still used
-    // At some point in the future we will refresh the list and it will be gone
-    if (this.hlogs.contains(log)) {
-      return false;
-    }
-
-    // Let's see it's still there
-    // This solution makes every miss very expensive to process since we
-    // almost completely refresh the cache each time
-    return !refreshHLogsAndSearch(log);
+    
+    final Set<String> hlogs = loadHLogsFromQueues();
+    return Iterables.filter(files, new Predicate<FileStatus>() {
+      @Override
+      public boolean apply(FileStatus file) {
+        String hlog = file.getPath().getName();
+        boolean logInReplicationQueue = hlogs.contains(hlog);
+        if (LOG.isDebugEnabled()) {
+          if (logInReplicationQueue) {
+            LOG.debug("Found log in ZK, keeping: " + hlog);
+          } else {
+            LOG.debug("Didn't find this log in ZK, deleting: " + hlog);
+          }
+        }
+       return !logInReplicationQueue;
+      }});
   }
 
   /**
-   * Search through all the hlogs we have in ZK to refresh the cache
-   * If a log is specified and found, then we early out and return true
-   * @param searchedLog log we are searching for, pass null to cache everything
-   *                    that's in zookeeper.
-   * @return false until a specified log is found.
+   * Load all hlogs in all replication queues from ZK
    */
-  private boolean refreshHLogsAndSearch(String searchedLog) {
-    this.hlogs.clear();
-    final boolean lookForLog = searchedLog != null;
+  private Set<String> loadHLogsFromQueues() {
     List<String> rss = replicationQueues.getListOfReplicators();
     if (rss == null) {
-      LOG.debug("Didn't find any region server that replicates, deleting: " +
-          searchedLog);
-      return false;
+      LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+      return ImmutableSet.of();
     }
+    Set<String> hlogs = Sets.newHashSet();
     for (String rs: rss) {
       List<String> listOfPeers = replicationQueues.getAllQueues(rs);
       // if rs just died, this will be null
@@ -100,23 +100,18 @@ public class ReplicationLogCleaner exten
       for (String id : listOfPeers) {
         List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
         if (peersHlogs != null) {
-          this.hlogs.addAll(peersHlogs);
-        }
-        // early exit if we found the log
-        if(lookForLog && this.hlogs.contains(searchedLog)) {
-          LOG.debug("Found log in ZK, keeping: " + searchedLog);
-          return true;
+          hlogs.addAll(peersHlogs);
         }
       }
     }
-    LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
-    return false;
+    return hlogs;
   }
 
   @Override
   public void setConf(Configuration config) {
     // If replication is disabled, keep all members null
     if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+      LOG.warn("Not configured - allowing all hlogs to be deleted");
       return;
     }
     // Make my own Configuration.  Then I'll have my own connection to zk that
@@ -132,10 +127,8 @@ public class ReplicationLogCleaner exten
     } catch (IOException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     }
-    refreshHLogsAndSearch(null);
   }
 
-
   @Override
   public void stop(String why) {
     if (this.stopped) return;

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1518465&r1=1518464&r2=1518465&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
Thu Aug 29 04:28:55 2013
@@ -1575,7 +1575,7 @@ public abstract class FSUtils {
    * @param fs file system
    * @param dir directory
    * @param filter path filter
-   * @return null if tabledir doesn't exist, otherwise FileStatus array
+   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
    */
   public static FileStatus [] listStatus(final FileSystem fs,
       final Path dir, final PathFilter filter) throws IOException {
@@ -1598,7 +1598,7 @@ public abstract class FSUtils {
    *
    * @param fs file system
    * @param dir directory
-   * @return null if tabledir doesn't exist, otherwise FileStatus array
+   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
    */
   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException
{
     return listStatus(fs, dir, null);

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java?rev=1518465&r1=1518464&r2=1518465&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
Thu Aug 29 04:28:55 2013
@@ -329,19 +329,20 @@ public class TestZooKeeperTableArchiveCl
     BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
     final int[] counter = new int[] { 0 };
     final CountDownLatch finished = new CountDownLatch(1);
-    Mockito.doAnswer(new Answer<Boolean>() {
+    Mockito.doAnswer(new Answer<Iterable<FileStatus>>() {
 
       @Override
-      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+      public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable
{
         counter[0]++;
-        LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to isFileDeletable for
file: "
+        LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for
files: "
             + invocation.getArguments()[0]);
 
-        Boolean ret = (Boolean) invocation.callRealMethod();
+        @SuppressWarnings("unchecked")
+        Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod();
         if (counter[0] >= expected) finished.countDown();
         return ret;
       }
-    }).when(delegateSpy).isFileDeletable(Mockito.any(FileStatus.class));
+    }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class));
     cleaners.set(0, delegateSpy);
 
     return finished;



Mime
View raw message